Browse Source
* [BUG-#5678][Registry]fix registry init node miss (#5686) * [Improvement][UI] Update the update time after the user information is successfully modified (#5684) * improve edit the userinfo success, but the updatetime is not the latest. * Improved shell task execution result log information, adding process.waitFor() and process.exitValue() information to the original log (#5691) Co-authored-by: shenglm <shenglm840722@126.com> * [Feature-#5565][Master Worker-Server] Global Param passed by sense dependencies (#5603) * add globalParams new plan with varPool * add unit test * add python task varPoolParams Co-authored-by: wangxj <wangxj31> * Issue robot translation judgment changed to Chinese (#5694) Co-authored-by: chenxingchun <438044805@qq.com> * the update function should use post instead of get (#5703) * enhance form verify (#5696) * checkState only supports %s not {} (#5711) * [Fix-5701]When deleting a user, the accessToken associated with the user should also be deleted (#5697) * update * fix the codestyle error * fix the compile error * support rollback * [Fix-5699][UI] Fix update user error in user information (#5700) * [Improvement] the automatically generated spi service name in alert-plugin is wrong (#5676) * bug fix the auto generated spi service can't be recongized * include a new method * [Improvement-5622][project management] Modify the title (#5723) * [Fix-5714] When updating the existing alarm instance, the creation time should't be updated (#5715) * add a new init method. * [Fix#5758] There are some problems in the api documentation that need to be improved (#5759) * add the necessary parameters * openapi improve * fix code style error * [FIX-#5721][master-server] Global params parameter missing (#5757) Co-authored-by: wangxj <wangxj31> * [Fix-5738][UI] The cancel button in the pop-up dialog of `batch copy` and `batch move` doesn't work. (#5739) * Update relatedItems.vue * Update relatedItems.vue * [Improvement#5741][Worker] Improve task process status log (#5776) * [Improvement-5773][server] need to support two parameters related to task (#5774) * add some new parameter for task * restore official properties * improve imports * modify a variable's name Co-authored-by: jiang hua <jiang.hua@zhaopin.com.cn> * [FIX-5786][Improvement][Server] When the Worker turns down, the MasterServer cannot handle the Remove event correctly and throws NPE * [Improvement][Worker] Task log may be lost #5775 (#5783) * [Imporvement #5725][CheckStyle] upgrade checkstyle file (#5789) * [Imporvement #5725][CheckStyle] upgrade checkstyle file Upgrade checkstyle.xml to support checkstyle version 8.24+ * change ci checkstyle version * [Fix-5795][Improvement][Server] The starttime field in the HttpTask log is not displayed as expected. (#5796) * improve timestamp format make the startime in the log of httptask to be easier to read. * fix bad code smell and update the note. * [Imporvement #5621][job instance] start-time and end-time (#5621) (#5797) ·the list of workflow instances is sorted by start time and end time ·This closes #5621 * fix (#5803) Co-authored-by: shuangbofu <fusb@tuya.com> * fix: Remove duplicate "registryClient.close" method calls (#5805) Co-authored-by: wen-hemin <wenhemin@apache.com> * [Improvement][SPI] support load single plugin (#5794) change load operation of 'registry.plugin.dir' * [Improvement][Api Module] refactor registry client, remove spring annotation (#5814) * fix: refactor registry client, remove spring annotation * fix UT * fix UT * fix checkstyle * fix UT * fix UT * fix UT * fix: Rename RegistryCenterUtils method name Co-authored-by: wen-hemin <wenhemin@apache.com> * [Fix-5699][UI] Fix update user error in user information introduced by #5700 (#5735) * [Fix-5726] When we used the UI page, we found some problems such as parameter validation, parameter update shows success but actually work (#5727) * enhance the validation in UI * enchance form verifaction * simplify disable condition * fix: Remove unused class (#5833) Co-authored-by: wen-hemin <wenhemin@apache.com> * [fix-5737] [Bug][Datasource] datsource other param check error (#5835) Co-authored-by: wanggang <wanggy01@servyou.com.cn> * [Fix-5719][K8s] Fix Ingress tls: got map expected array On TLS enabled On Kubernetes [Fix-5719][K8s] Fix Ingress tls: got map expected array On TLS enabled On Kubernetes * [Fix-5825][BUG][WEB] the resource tree in the process definition of latest dev branch can't display correctly (#5826) * resoures-shows-error * fix codestyle error * add license header for new js * fix codesmell * [Improvement-5852][server] Support two parameters related to task for the rest of type of tasks. (#5867) * provide two system parameters to support the rest of type of tasks * provide two system parameters to support the rest of type of tasks * improve test conversion * [Improvement][Fix-5769][UI]When we try to delete the existing dag, the console in web browser would shows exception (#5770) * fix bug * cache the this variable * Avoid self name * fix code style compile error * [Fix-5781][UT] Fix test coverage in sonar (#5817) * build(UT): make jacoco running in offline-instrumentation issue: #5781 * build(UT): remove the jacoco agent dependency in microbench issue: #5781 * [Fix-5808][Server] When we try to transfer data using datax between different types of data sources, the worker will exit with ClassCastException (#5809) * bug fix * fix bug * simplify the code format * add a new parameter to make it easier to understand. * [Fix-5830][Improvement][UI] Improve the selection style in dag edit dialog (#5829) * improve the selection style * update another file * remove unnecessary css part. * [Fix-5904][upgrade]fix dev branch upgrade mysql sql script error (#5821) * fix dev branch upgrade mysql sql script error. * Update naming convention. * [Improvement][Api Module] refactor DataSourceParam and DependentParam, remove spring annotation (#5832) * fix: refactor api utils class, remove spring annotation. * fix: Optimization comments Co-authored-by: wen-hemin <wenhemin@apache.com> * correct the wrong annotion from zk queue implemented to java priority blocking queue (#5906) Co-authored-by: ywang46 <ywang46@paypal.com> * Add a Gitter chat badge to README.md (#5883) * Add Gitter badge * Update README.md Co-authored-by: David <dailidong66@gmail.com> * ci: improve maven connection in CI builds (#5924) issue: #5921 * [Improvement][Master]fix typo (#5934) ·fix typo in MasterBaseTaskExecThread * [Fix-5886][server] Enhanced scheduler delete check (#5936) * Add:Name verification remove the first and last spaces. * Update: wrong word: 'WAITTING' ->'WAITING' * Add: Strengthen verification Co-authored-by: Squid <2824638304@qq.com> * [Improvement-5880][api] Optimized data structure of pagination query API results (#5895) * [5880][refactor]Optimized data structure of pagination query API results - refactor PageInfo and delete returnDataListPaging in API - modify the related Controller and Service and the corresponding Test * Merge branch 'dev' of github.com:apache/dolphinscheduler into dev Conflicts: dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java Co-authored-by: 蔡泽华 <sorea1k@163.com> * [IMPROVEMENT]fix mysql comment error (#5959) * [Improvement][Api]fix typo (#5960) * [Imporvement #5621][job instance] start-time and end-time (#5621) ·the list of workflow instances is sorted by start time and end time ·This closes #5621 * [FIX-5975]queryLastRunningProcess sql in ProcessInstanceMapper.xml (#5980) * [NEW FEATURE][FIX-4385] compensation task add the ability to configure parallelism (#5912) * update * web improved * improve the ui * add the ability to configure the parallelism * update variables * enhance the ut and add necessary note * fix code style * fix code style issue * ensure the complation task in parallel mode can run the right numbers of tasks. * [Improvement][dao]When I search for the keyword description, the web UI shows empty (#5952) * [Bug][WorkerServer] SqlTask NullPointerException #5549 * [Improvement][dao]When I search for the keyword Modify User, the web UI shows empty #5428 * [Improvement][dao]When I search for the keyword Modify User, the web UI shows empty #5428 * [Improvement][dao]When I search for the keyword Modify User, the web UI shows empty #5428 * [Improvement][dao]When I search for the keyword Modify User, the web UI shows empty #5428 * [Improvement][dao]When I search for the keyword Modify User, the web UI shows empty #5428 * [Improvement][dao]When I search for the keyword Modify User, the web UI shows empty #5428 * [Improvement][dao]When I search for the keyword description, the web UI shows empty #5428 * fix the readme typing issue (#5998) * Fix unchecked type conversions * Use indentation level reported by checkstyle * Reorganize CI workflows to fasten the wasted time and resources (#6011) * Add standalone server module to make it easier to develop (#6022) * Task node of SWITCH (#5939) * [Feature-#5273][server-master] Task node of SWITCH (#5922) Co-authored-by: wangxj <wangxj31> * remove description of bonecp (#6030) Co-authored-by: shaojwu <shaojwu@ebay.com> * [Improvement][Api Module]split alert group list-paging interface (#5941) * [Improvement][Api Module]split alert group list-paging interface * [FIX-#6007]Wrong complement date (#6026) * [FIX-#6007]Wrong complement date * [style]Wrong complement date * [Improvement-6024][dist] Remove useless packaging commands (#6029) ·Remove useless packaging commands in dolphinscheduler-bin.xml This closes #6024 Co-authored-by: mask <liuhu@zhiyoutec.com> * [FIX-5908][MasterServer] When executing an compensation task, the execution thread would have a NPE (#5909) * fix the npe in MasterExec * fix the compile error * Add `.asf.yaml` to easily set the GitHub metadata (#6035) * fix dead server cannot stop (#6046) * Enhancement Translation (#6042) * replaced Loading... with i18n * modified Edit zh_CN translation * Delete zh_CN.js Co-authored-by: David <dailidong66@gmail.com> * fix bug #6053 zh_CN.js is lost * [Fix-6038][ui] width of "SQL Statement" in Dag FormLineModal will be shrunk if sql line is too long (#6040) This closes #6038 * [Improvement] Fix inefficient map iterator (#6004) * Fix inefficient map iterator * Use forEach and remove call to valueOf * Modify AbstractParameters * Enhance `StandaloneServer` so that we don't need to update the version number manually (#6074) * Remove invalid character in `.asf.yaml` (#6075) * Remove invalid character `\n` in `.asf.yaml` (#6077) It turns out that the invalid character is `\n` * Add alert server into standalone-server as well and some minor polish (#6087) * Support starting standalone server in Docker image (#6102) Also remove unused class * [Feature-4355][Master-Worker-API] improvements of master and scheduler module (#6095) * [Feature-4355][Master-Worker-API] improvements of master and scheduler module (#6085) * master refactor: 1. spi for task submit and other actions(pause, kill) 2. remove threads for process instance and task instance. 3. add events for process instance and task instance * ut npe * add try catch * code style * fix critical bugs * fix critical bugs * fix critical bugs * fix critical bugs * Remove unused params in SwitchTaskTest (#6109) * fix ut * fix ut * fix ut * fix ut * fix server module ut * fix ut Co-authored-by: Kirs <acm_master@163.com> Co-authored-by: kyoty <echohlne@gmail.com> Co-authored-by: ji04xiaogang <ji04xiaogang@163.com> Co-authored-by: shenglm <shenglm840722@126.com> Co-authored-by: wangxj3 <857234426@qq.com> Co-authored-by: xingchun-chen <55787491+xingchun-chen@users.noreply.github.com> Co-authored-by: chenxingchun <438044805@qq.com> Co-authored-by: Shiwen Cheng <chengshiwen0103@gmail.com> Co-authored-by: Jianchao Wang <akingchao@qq.com> Co-authored-by: Tanvi Moharir <74228962+tanvimoharir@users.noreply.github.com> Co-authored-by: Hua Jiang <jianghuachinacom@163.com> Co-authored-by: jiang hua <jiang.hua@zhaopin.com.cn> Co-authored-by: Wenjun Ruan <861923274@qq.com> Co-authored-by: Tandoy <56899730+Tandoy@users.noreply.github.com> Co-authored-by: 傅双波 <786183073@qq.com> Co-authored-by: shuangbofu <fusb@tuya.com> Co-authored-by: wen-hemin <39549317+wen-hemin@users.noreply.github.com> Co-authored-by: wen-hemin <wenhemin@apache.com> Co-authored-by: geosmart <geosmart@hotmail.com> Co-authored-by: wanggang <wanggy01@servyou.com.cn> Co-authored-by: AzureCN <colorazure@163.com> Co-authored-by: 深刻 <tsund@qq.com> Co-authored-by: zhuangchong <37063904+zhuangchong@users.noreply.github.com> Co-authored-by: Yao WANG <Yao.MR.CN@gmail.com> Co-authored-by: ywang46 <ywang46@paypal.com> Co-authored-by: The Gitter Badger <badger@gitter.im> Co-authored-by: David <dailidong66@gmail.com> Co-authored-by: Squidyu <1297554122@qq.com> Co-authored-by: Squid <2824638304@qq.com> Co-authored-by: soreak <60459867+soreak@users.noreply.github.com> Co-authored-by: 蔡泽华 <sorea1k@163.com> Co-authored-by: yimaixinchen <yimaixinchen@163.com> Co-authored-by: atai-555 <74188560+atai-555@users.noreply.github.com> Co-authored-by: didiaode18 <563646039@qq.com> Co-authored-by: Roy <yongjuncao1213@gmail.com> Co-authored-by: lyxell <alyxell@kth.se> Co-authored-by: Wenjun Ruan <wenjun@apache.org> Co-authored-by: kezhenxu94 <kezhenxu94@apache.org> Co-authored-by: myangle1120 <942542838@qq.com> Co-authored-by: gabry.wu <gabrywu@apache.org> Co-authored-by: shaojwu <shaojwu@ebay.com> Co-authored-by: Shukun Zhang <60541766+andream7@users.noreply.github.com> Co-authored-by: linquan <1175687813@qq.com> Co-authored-by: mask <39329477+Narcasserun@users.noreply.github.com> Co-authored-by: mask <liuhu@zhiyoutec.com> Co-authored-by: RichardStark <49977764+RichardStark@users.noreply.github.com> Co-authored-by: lenboo <baoliang.leon@gmail.com> Co-authored-by: lilyzhou <lj_zhou@outlook.com> Co-authored-by: OS <29528966+lenboo@users.noreply.github.com> Co-authored-by: junfan.zhang <zuston.shacha@gmail.com> Co-authored-by: JinyLeeChina <297062848@qq.com>2.0.7-release
JinyLeeChina
3 years ago
committed by
GitHub
161 changed files with 5840 additions and 2762 deletions
@ -0,0 +1,44 @@
|
||||
# |
||||
# Licensed to the Apache Software Foundation (ASF) under one or more |
||||
# contributor license agreements. See the NOTICE file distributed with |
||||
# this work for additional information regarding copyright ownership. |
||||
# The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
# (the "License"); you may not use this file except in compliance with |
||||
# the License. You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
# |
||||
|
||||
github: |
||||
description: Apache DolphinScheduler is a distributed and extensible workflow scheduler platform with powerful DAG visual interfaces, dedicated to solving complex job dependencies in the data pipeline and providing various types of jobs available out of box. |
||||
homepage: https://dolphinscheduler.apache.org/ |
||||
labels: |
||||
- airflow |
||||
- schedule |
||||
- job-scheduler |
||||
- oozie |
||||
- task-scheduler |
||||
- azkaban |
||||
- distributed-schedule-system |
||||
- workflow-scheduling-system |
||||
- etl-dependency |
||||
- workflow-platform |
||||
- cronjob-schedule |
||||
- job-schedule |
||||
- task-schedule |
||||
- workflow-schedule |
||||
- data-schedule |
||||
enabled_merge_buttons: |
||||
squash: true |
||||
merge: false |
||||
rebase: false |
||||
protected_branches: |
||||
dev: |
||||
required_status_checks: |
||||
strict: true |
@ -1,60 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.alert.utils; |
||||
|
||||
import static org.junit.Assert.assertEquals; |
||||
import static org.junit.Assert.assertNull; |
||||
|
||||
import java.util.Arrays; |
||||
|
||||
import org.junit.Test; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
public class FuncUtilsTest { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(FuncUtilsTest.class); |
||||
|
||||
/** |
||||
* Test mkString |
||||
*/ |
||||
@Test |
||||
public void testMKString() { |
||||
|
||||
//Define users list
|
||||
Iterable<String> users = Arrays.asList("user1", "user2", "user3"); |
||||
//Define split
|
||||
String split = "|"; |
||||
|
||||
//Invoke mkString with correctParams
|
||||
String result = FuncUtils.mkString(users, split); |
||||
logger.info(result); |
||||
|
||||
//Expected result string
|
||||
assertEquals("user1|user2|user3", result); |
||||
|
||||
//Null list expected return null
|
||||
result = FuncUtils.mkString(null, split); |
||||
assertNull(result); |
||||
|
||||
//Null split expected return null
|
||||
result = FuncUtils.mkString(users, null); |
||||
assertNull(result); |
||||
|
||||
} |
||||
} |
@ -0,0 +1,111 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.common.enums; |
||||
|
||||
import io.netty.channel.Channel; |
||||
|
||||
/** |
||||
* state event |
||||
*/ |
||||
public class StateEvent { |
||||
|
||||
/** |
||||
* origin_pid-origin_task_id-process_instance_id-task_instance_id |
||||
*/ |
||||
private String key; |
||||
|
||||
private StateEventType type; |
||||
|
||||
private ExecutionStatus executionStatus; |
||||
|
||||
private int taskInstanceId; |
||||
|
||||
private int processInstanceId; |
||||
|
||||
private String context; |
||||
|
||||
private Channel channel; |
||||
|
||||
public ExecutionStatus getExecutionStatus() { |
||||
return executionStatus; |
||||
} |
||||
|
||||
public void setExecutionStatus(ExecutionStatus executionStatus) { |
||||
this.executionStatus = executionStatus; |
||||
} |
||||
|
||||
public int getTaskInstanceId() { |
||||
return taskInstanceId; |
||||
} |
||||
|
||||
public int getProcessInstanceId() { |
||||
return processInstanceId; |
||||
} |
||||
|
||||
public void setProcessInstanceId(int processInstanceId) { |
||||
this.processInstanceId = processInstanceId; |
||||
} |
||||
|
||||
public String getContext() { |
||||
return context; |
||||
} |
||||
|
||||
public void setContext(String context) { |
||||
this.context = context; |
||||
} |
||||
|
||||
public void setTaskInstanceId(int taskInstanceId) { |
||||
this.taskInstanceId = taskInstanceId; |
||||
} |
||||
|
||||
public Channel getChannel() { |
||||
return channel; |
||||
} |
||||
|
||||
public void setChannel(Channel channel) { |
||||
this.channel = channel; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "State Event :" |
||||
+ "key: " + key |
||||
+ " type: " + type.toString() |
||||
+ " executeStatus: " + executionStatus |
||||
+ " task instance id: " + taskInstanceId |
||||
+ " process instance id: " + processInstanceId |
||||
+ " context: " + context |
||||
; |
||||
} |
||||
|
||||
public String getKey() { |
||||
return key; |
||||
} |
||||
|
||||
public void setKey(String key) { |
||||
this.key = key; |
||||
} |
||||
|
||||
public void setType(StateEventType type) { |
||||
this.type = type; |
||||
} |
||||
|
||||
public StateEventType getType() { |
||||
return this.type; |
||||
} |
||||
} |
@ -0,0 +1,91 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.common.task.switchtask; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.DependentRelation; |
||||
import org.apache.dolphinscheduler.common.process.ResourceInfo; |
||||
import org.apache.dolphinscheduler.common.task.AbstractParameters; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
|
||||
public class SwitchParameters extends AbstractParameters { |
||||
|
||||
private DependentRelation dependRelation; |
||||
private String relation; |
||||
private List<String> nextNode; |
||||
|
||||
@Override |
||||
public boolean checkParameters() { |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public List<ResourceInfo> getResourceFilesList() { |
||||
return new ArrayList<>(); |
||||
} |
||||
|
||||
private int resultConditionLocation; |
||||
private List<SwitchResultVo> dependTaskList; |
||||
|
||||
public DependentRelation getDependRelation() { |
||||
return dependRelation; |
||||
} |
||||
|
||||
public void setDependRelation(DependentRelation dependRelation) { |
||||
this.dependRelation = dependRelation; |
||||
} |
||||
|
||||
public int getResultConditionLocation() { |
||||
return resultConditionLocation; |
||||
} |
||||
|
||||
public void setResultConditionLocation(int resultConditionLocation) { |
||||
this.resultConditionLocation = resultConditionLocation; |
||||
} |
||||
|
||||
public String getRelation() { |
||||
return relation; |
||||
} |
||||
|
||||
public void setRelation(String relation) { |
||||
this.relation = relation; |
||||
} |
||||
|
||||
public List<SwitchResultVo> getDependTaskList() { |
||||
return dependTaskList; |
||||
} |
||||
|
||||
public void setDependTaskList(List<SwitchResultVo> dependTaskList) { |
||||
this.dependTaskList = dependTaskList; |
||||
} |
||||
|
||||
public List<String> getNextNode() { |
||||
return nextNode; |
||||
} |
||||
|
||||
public void setNextNode(Object nextNode) { |
||||
if (nextNode instanceof String) { |
||||
List<String> nextNodeList = new ArrayList<>(); |
||||
nextNodeList.add(String.valueOf(nextNode)); |
||||
this.nextNode = nextNodeList; |
||||
} else { |
||||
this.nextNode = (ArrayList) nextNode; |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,49 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.common.task.switchtask; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
|
||||
public class SwitchResultVo { |
||||
|
||||
private String condition; |
||||
private List<String> nextNode; |
||||
|
||||
public String getCondition() { |
||||
return condition; |
||||
} |
||||
|
||||
public void setCondition(String condition) { |
||||
this.condition = condition; |
||||
} |
||||
|
||||
public List<String> getNextNode() { |
||||
return nextNode; |
||||
} |
||||
|
||||
public void setNextNode(Object nextNode) { |
||||
if (nextNode instanceof String) { |
||||
List<String> nextNodeList = new ArrayList<>(); |
||||
nextNodeList.add(String.valueOf(nextNode)); |
||||
this.nextNode = nextNodeList; |
||||
} else { |
||||
this.nextNode = (ArrayList) nextNode; |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,50 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.dao.vo; |
||||
|
||||
/** |
||||
* AlertGroupVo |
||||
*/ |
||||
public class AlertGroupVo { |
||||
|
||||
/** |
||||
* primary key |
||||
*/ |
||||
private int id; |
||||
/** |
||||
* group_name |
||||
*/ |
||||
private String groupName; |
||||
|
||||
public int getId() { |
||||
return id; |
||||
} |
||||
|
||||
public void setId(int id) { |
||||
this.id = id; |
||||
} |
||||
|
||||
public String getGroupName() { |
||||
return groupName; |
||||
} |
||||
|
||||
public void setGroupName(String groupName) { |
||||
this.groupName = groupName; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,72 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
|
||||
import java.io.Serializable; |
||||
|
||||
/** |
||||
* process host update |
||||
*/ |
||||
public class HostUpdateCommand implements Serializable { |
||||
|
||||
/** |
||||
* task id |
||||
*/ |
||||
private int taskInstanceId; |
||||
|
||||
private String processHost; |
||||
|
||||
public int getTaskInstanceId() { |
||||
return taskInstanceId; |
||||
} |
||||
|
||||
public void setTaskInstanceId(int taskInstanceId) { |
||||
this.taskInstanceId = taskInstanceId; |
||||
} |
||||
|
||||
public String getProcessHost() { |
||||
return processHost; |
||||
} |
||||
|
||||
public void setProcessHost(String processHost) { |
||||
this.processHost = processHost; |
||||
} |
||||
|
||||
/** |
||||
* package request command |
||||
* |
||||
* @return command |
||||
*/ |
||||
public Command convert2Command() { |
||||
Command command = new Command(); |
||||
command.setType(CommandType.PROCESS_HOST_UPDATE_REQUST); |
||||
byte[] body = JSONUtils.toJsonByteArray(this); |
||||
command.setBody(body); |
||||
return command; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "HostUpdateCommand{" |
||||
+ "taskInstanceId=" + taskInstanceId |
||||
+ "host=" + processHost |
||||
+ '}'; |
||||
} |
||||
} |
@ -0,0 +1,83 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
|
||||
import java.io.Serializable; |
||||
|
||||
public class HostUpdateResponseCommand implements Serializable { |
||||
|
||||
private int taskInstanceId; |
||||
|
||||
private String processHost; |
||||
|
||||
private int status; |
||||
|
||||
public HostUpdateResponseCommand(int taskInstanceId, String processHost, int code) { |
||||
this.taskInstanceId = taskInstanceId; |
||||
this.processHost = processHost; |
||||
this.status = code; |
||||
} |
||||
|
||||
public int getTaskInstanceId() { |
||||
return this.taskInstanceId; |
||||
} |
||||
|
||||
public void setTaskInstanceId(int taskInstanceId) { |
||||
this.taskInstanceId = taskInstanceId; |
||||
} |
||||
|
||||
public String getProcessHost() { |
||||
return this.processHost; |
||||
} |
||||
|
||||
public void setProcessHost(String processHost) { |
||||
this.processHost = processHost; |
||||
} |
||||
|
||||
public int getStatus() { |
||||
return status; |
||||
} |
||||
|
||||
public void setStatus(int status) { |
||||
this.status = status; |
||||
} |
||||
|
||||
/** |
||||
* package request command |
||||
* |
||||
* @return command |
||||
*/ |
||||
public Command convert2Command() { |
||||
Command command = new Command(); |
||||
command.setType(CommandType.PROCESS_HOST_UPDATE_REQUST); |
||||
byte[] body = JSONUtils.toJsonByteArray(this); |
||||
command.setBody(body); |
||||
return command; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "HostUpdateResponseCommand{" |
||||
+ "taskInstanceId=" + taskInstanceId |
||||
+ "host=" + processHost |
||||
+ '}'; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,131 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
|
||||
import java.io.Serializable; |
||||
|
||||
/** |
||||
* db task final result response command |
||||
*/ |
||||
public class StateEventChangeCommand implements Serializable { |
||||
|
||||
private String key; |
||||
|
||||
private ExecutionStatus sourceStatus; |
||||
|
||||
private int sourceProcessInstanceId; |
||||
|
||||
private int sourceTaskInstanceId; |
||||
|
||||
private int destProcessInstanceId; |
||||
|
||||
private int destTaskInstanceId; |
||||
|
||||
public StateEventChangeCommand() { |
||||
super(); |
||||
} |
||||
|
||||
public StateEventChangeCommand(int sourceProcessInstanceId, int sourceTaskInstanceId, |
||||
ExecutionStatus sourceStatus, |
||||
int destProcessInstanceId, |
||||
int destTaskInstanceId |
||||
) { |
||||
this.key = String.format("%d-%d-%d-%d", |
||||
sourceProcessInstanceId, |
||||
sourceTaskInstanceId, |
||||
destProcessInstanceId, |
||||
destTaskInstanceId); |
||||
|
||||
this.sourceStatus = sourceStatus; |
||||
this.sourceProcessInstanceId = sourceProcessInstanceId; |
||||
this.sourceTaskInstanceId = sourceTaskInstanceId; |
||||
this.destProcessInstanceId = destProcessInstanceId; |
||||
this.destTaskInstanceId = destTaskInstanceId; |
||||
} |
||||
|
||||
public String getKey() { |
||||
return key; |
||||
} |
||||
|
||||
public void setKey(String key) { |
||||
this.key = key; |
||||
} |
||||
|
||||
/** |
||||
* package response command |
||||
* |
||||
* @return command |
||||
*/ |
||||
public Command convert2Command() { |
||||
Command command = new Command(); |
||||
command.setType(CommandType.STATE_EVENT_REQUEST); |
||||
byte[] body = JSONUtils.toJsonByteArray(this); |
||||
command.setBody(body); |
||||
return command; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "StateEventResponseCommand{" |
||||
+ "key=" + key |
||||
+ '}'; |
||||
} |
||||
|
||||
public ExecutionStatus getSourceStatus() { |
||||
return sourceStatus; |
||||
} |
||||
|
||||
public void setSourceStatus(ExecutionStatus sourceStatus) { |
||||
this.sourceStatus = sourceStatus; |
||||
} |
||||
|
||||
public int getSourceProcessInstanceId() { |
||||
return sourceProcessInstanceId; |
||||
} |
||||
|
||||
public void setSourceProcessInstanceId(int sourceProcessInstanceId) { |
||||
this.sourceProcessInstanceId = sourceProcessInstanceId; |
||||
} |
||||
|
||||
public int getSourceTaskInstanceId() { |
||||
return sourceTaskInstanceId; |
||||
} |
||||
|
||||
public void setSourceTaskInstanceId(int sourceTaskInstanceId) { |
||||
this.sourceTaskInstanceId = sourceTaskInstanceId; |
||||
} |
||||
|
||||
public int getDestProcessInstanceId() { |
||||
return destProcessInstanceId; |
||||
} |
||||
|
||||
public void setDestProcessInstanceId(int destProcessInstanceId) { |
||||
this.destProcessInstanceId = destProcessInstanceId; |
||||
} |
||||
|
||||
public int getDestTaskInstanceId() { |
||||
return destTaskInstanceId; |
||||
} |
||||
|
||||
public void setDestTaskInstanceId(int destTaskInstanceId) { |
||||
this.destTaskInstanceId = destTaskInstanceId; |
||||
} |
||||
} |
@ -0,0 +1,78 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
|
||||
import java.io.Serializable; |
||||
|
||||
/** |
||||
* db task final result response command |
||||
*/ |
||||
public class StateEventResponseCommand implements Serializable { |
||||
|
||||
private String key; |
||||
private int status; |
||||
|
||||
public StateEventResponseCommand() { |
||||
super(); |
||||
} |
||||
|
||||
public StateEventResponseCommand(int status, String key) { |
||||
this.status = status; |
||||
this.key = key; |
||||
} |
||||
|
||||
public int getStatus() { |
||||
return status; |
||||
} |
||||
|
||||
public void setStatus(int status) { |
||||
this.status = status; |
||||
} |
||||
|
||||
public String getKey() { |
||||
return key; |
||||
} |
||||
|
||||
public void setKey(String key) { |
||||
this.key = key; |
||||
} |
||||
|
||||
/** |
||||
* package response command |
||||
* |
||||
* @return command |
||||
*/ |
||||
public Command convert2Command() { |
||||
Command command = new Command(); |
||||
command.setType(CommandType.DB_TASK_RESPONSE); |
||||
byte[] body = JSONUtils.toJsonByteArray(this); |
||||
command.setBody(body); |
||||
return command; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "StateEventResponseCommand{" |
||||
+ "key=" + key |
||||
+ ", status=" + status |
||||
+ '}'; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,125 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.processor; |
||||
|
||||
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; |
||||
|
||||
import org.apache.dolphinscheduler.remote.NettyRemotingClient; |
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; |
||||
import org.apache.dolphinscheduler.remote.utils.Host; |
||||
|
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.stereotype.Service; |
||||
|
||||
import io.netty.channel.Channel; |
||||
|
||||
/** |
||||
* task callback service |
||||
*/ |
||||
@Service |
||||
public class StateEventCallbackService { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(StateEventCallbackService.class); |
||||
private static final int[] RETRY_BACKOFF = {1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200, 200}; |
||||
|
||||
/** |
||||
* remote channels |
||||
*/ |
||||
private static final ConcurrentHashMap<String, NettyRemoteChannel> REMOTE_CHANNELS = new ConcurrentHashMap<>(); |
||||
|
||||
/** |
||||
* netty remoting client |
||||
*/ |
||||
private final NettyRemotingClient nettyRemotingClient; |
||||
|
||||
public StateEventCallbackService() { |
||||
final NettyClientConfig clientConfig = new NettyClientConfig(); |
||||
this.nettyRemotingClient = new NettyRemotingClient(clientConfig); |
||||
} |
||||
|
||||
/** |
||||
* add callback channel |
||||
* |
||||
* @param channel channel |
||||
*/ |
||||
public void addRemoteChannel(String host, NettyRemoteChannel channel) { |
||||
REMOTE_CHANNELS.put(host, channel); |
||||
} |
||||
|
||||
/** |
||||
* get callback channel |
||||
* |
||||
* @param host |
||||
* @return callback channel |
||||
*/ |
||||
private NettyRemoteChannel newRemoteChannel(Host host) { |
||||
Channel newChannel; |
||||
NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(host.getAddress()); |
||||
if (nettyRemoteChannel != null) { |
||||
if (nettyRemoteChannel.isActive()) { |
||||
return nettyRemoteChannel; |
||||
} |
||||
} |
||||
newChannel = nettyRemotingClient.getChannel(host); |
||||
if (newChannel != null) { |
||||
return newRemoteChannel(newChannel, host.getAddress()); |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
public int pause(int ntries) { |
||||
return SLEEP_TIME_MILLIS * RETRY_BACKOFF[ntries % RETRY_BACKOFF.length]; |
||||
} |
||||
|
||||
private NettyRemoteChannel newRemoteChannel(Channel newChannel, long opaque, String host) { |
||||
NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, opaque); |
||||
addRemoteChannel(host, remoteChannel); |
||||
return remoteChannel; |
||||
} |
||||
|
||||
private NettyRemoteChannel newRemoteChannel(Channel newChannel, String host) { |
||||
NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel); |
||||
addRemoteChannel(host, remoteChannel); |
||||
return remoteChannel; |
||||
} |
||||
|
||||
/** |
||||
* remove callback channels |
||||
*/ |
||||
public void remove(String host) { |
||||
REMOTE_CHANNELS.remove(host); |
||||
} |
||||
|
||||
/** |
||||
* send result |
||||
* |
||||
* @param command command |
||||
*/ |
||||
public void sendResult(String address, int port, Command command) { |
||||
logger.info("send result, host:{}, command:{}", address, command.toString()); |
||||
Host host = new Host(address, port); |
||||
NettyRemoteChannel nettyRemoteChannel = newRemoteChannel(host); |
||||
if (nettyRemoteChannel != null) { |
||||
nettyRemoteChannel.writeAndFlush(command); |
||||
} |
||||
} |
||||
} |
@ -1,175 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.server.master.future; |
||||
|
||||
|
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.util.Iterator; |
||||
import java.util.LinkedList; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.concurrent.CountDownLatch; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.concurrent.atomic.AtomicReference; |
||||
|
||||
/** |
||||
* task future |
||||
*/ |
||||
public class TaskFuture { |
||||
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(TaskFuture.class); |
||||
|
||||
private final static ConcurrentHashMap<Long,TaskFuture> FUTURE_TABLE = new ConcurrentHashMap<>(256); |
||||
|
||||
/** |
||||
* request unique identification |
||||
*/ |
||||
private final long opaque; |
||||
|
||||
/** |
||||
* timeout |
||||
*/ |
||||
private final long timeoutMillis; |
||||
|
||||
private final CountDownLatch latch = new CountDownLatch(1); |
||||
|
||||
private final long beginTimestamp = System.currentTimeMillis(); |
||||
|
||||
/** |
||||
* response command |
||||
*/ |
||||
private AtomicReference<Command> responseCommandReference = new AtomicReference<>(); |
||||
|
||||
private volatile boolean sendOk = true; |
||||
|
||||
private AtomicReference<Throwable> causeReference; |
||||
|
||||
public TaskFuture(long opaque, long timeoutMillis) { |
||||
this.opaque = opaque; |
||||
this.timeoutMillis = timeoutMillis; |
||||
FUTURE_TABLE.put(opaque, this); |
||||
} |
||||
|
||||
/** |
||||
* wait for response |
||||
* @return command |
||||
* @throws InterruptedException if error throws InterruptedException |
||||
*/ |
||||
public Command waitResponse() throws InterruptedException { |
||||
this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS); |
||||
return this.responseCommandReference.get(); |
||||
} |
||||
|
||||
/** |
||||
* put response |
||||
* |
||||
* @param responseCommand responseCommand |
||||
*/ |
||||
public void putResponse(final Command responseCommand) { |
||||
responseCommandReference.set(responseCommand); |
||||
this.latch.countDown(); |
||||
FUTURE_TABLE.remove(opaque); |
||||
} |
||||
|
||||
/** |
||||
* whether timeout |
||||
* @return timeout |
||||
*/ |
||||
public boolean isTimeout() { |
||||
long diff = System.currentTimeMillis() - this.beginTimestamp; |
||||
return diff > this.timeoutMillis; |
||||
} |
||||
|
||||
public static void notify(final Command responseCommand){ |
||||
TaskFuture taskFuture = FUTURE_TABLE.remove(responseCommand.getOpaque()); |
||||
if(taskFuture != null){ |
||||
taskFuture.putResponse(responseCommand); |
||||
} |
||||
} |
||||
|
||||
|
||||
public boolean isSendOK() { |
||||
return sendOk; |
||||
} |
||||
|
||||
public void setSendOk(boolean sendOk) { |
||||
this.sendOk = sendOk; |
||||
} |
||||
|
||||
public void setCause(Throwable cause) { |
||||
causeReference.set(cause); |
||||
} |
||||
|
||||
public Throwable getCause() { |
||||
return causeReference.get(); |
||||
} |
||||
|
||||
public long getOpaque() { |
||||
return opaque; |
||||
} |
||||
|
||||
public long getTimeoutMillis() { |
||||
return timeoutMillis; |
||||
} |
||||
|
||||
public long getBeginTimestamp() { |
||||
return beginTimestamp; |
||||
} |
||||
|
||||
public Command getResponseCommand() { |
||||
return responseCommandReference.get(); |
||||
} |
||||
|
||||
public void setResponseCommand(Command responseCommand) { |
||||
responseCommandReference.set(responseCommand); |
||||
} |
||||
|
||||
|
||||
/** |
||||
* scan future table |
||||
*/ |
||||
public static void scanFutureTable(){ |
||||
final List<TaskFuture> futureList = new LinkedList<>(); |
||||
Iterator<Map.Entry<Long, TaskFuture>> it = FUTURE_TABLE.entrySet().iterator(); |
||||
while (it.hasNext()) { |
||||
Map.Entry<Long, TaskFuture> next = it.next(); |
||||
TaskFuture future = next.getValue(); |
||||
if ((future.getBeginTimestamp() + future.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) { |
||||
futureList.add(future); |
||||
it.remove(); |
||||
LOGGER.warn("remove timeout request : {}", future); |
||||
} |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "TaskFuture{" + |
||||
"opaque=" + opaque + |
||||
", timeoutMillis=" + timeoutMillis + |
||||
", latch=" + latch + |
||||
", beginTimestamp=" + beginTimestamp + |
||||
", responseCommand=" + responseCommandReference.get() + |
||||
", sendOk=" + sendOk + |
||||
", cause=" + causeReference.get() + |
||||
'}'; |
||||
} |
||||
} |
@ -0,0 +1,42 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.processor; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.common.utils.Preconditions; |
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import io.netty.channel.Channel; |
||||
|
||||
public class HostUpdateResponseProcessor implements NettyRequestProcessor { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(HostUpdateResponseProcessor.class); |
||||
|
||||
@Override |
||||
public void process(Channel channel, Command command) { |
||||
Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); |
||||
|
||||
HostUpdateResponseProcessor responseCommand = JSONUtils.parseObject(command.getBody(), HostUpdateResponseProcessor.class); |
||||
logger.info("received process host response command : {}", responseCommand); |
||||
} |
||||
} |
@ -0,0 +1,74 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.processor; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.enums.StateEvent; |
||||
import org.apache.dolphinscheduler.common.enums.StateEventType; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.common.utils.Preconditions; |
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; |
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; |
||||
import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService; |
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
|
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import io.netty.channel.Channel; |
||||
|
||||
/** |
||||
* handle state event received from master/api |
||||
*/ |
||||
public class StateEventProcessor implements NettyRequestProcessor { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(StateEventProcessor.class); |
||||
|
||||
private StateEventResponseService stateEventResponseService; |
||||
|
||||
public StateEventProcessor() { |
||||
stateEventResponseService = SpringApplicationContext.getBean(StateEventResponseService.class); |
||||
} |
||||
|
||||
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) { |
||||
this.stateEventResponseService.init(processInstanceExecMaps); |
||||
} |
||||
|
||||
@Override |
||||
public void process(Channel channel, Command command) { |
||||
Preconditions.checkArgument(CommandType.STATE_EVENT_REQUEST == command.getType(), String.format("invalid command type: %s", command.getType())); |
||||
|
||||
StateEventChangeCommand stateEventChangeCommand = JSONUtils.parseObject(command.getBody(), StateEventChangeCommand.class); |
||||
StateEvent stateEvent = new StateEvent(); |
||||
stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); |
||||
stateEvent.setKey(stateEventChangeCommand.getKey()); |
||||
stateEvent.setProcessInstanceId(stateEventChangeCommand.getDestProcessInstanceId()); |
||||
stateEvent.setTaskInstanceId(stateEventChangeCommand.getDestTaskInstanceId()); |
||||
StateEventType type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE; |
||||
stateEvent.setType(type); |
||||
|
||||
logger.info("received command : {}", stateEvent.toString()); |
||||
stateEventResponseService.addResponse(stateEvent); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,149 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.processor.queue; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.enums.StateEvent; |
||||
import org.apache.dolphinscheduler.common.thread.Stopper; |
||||
import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand; |
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
import java.util.concurrent.BlockingQueue; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.concurrent.LinkedBlockingQueue; |
||||
|
||||
import javax.annotation.PostConstruct; |
||||
import javax.annotation.PreDestroy; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
import io.netty.channel.Channel; |
||||
|
||||
/** |
||||
* task manager |
||||
*/ |
||||
@Component |
||||
public class StateEventResponseService { |
||||
|
||||
/** |
||||
* logger |
||||
*/ |
||||
private final Logger logger = LoggerFactory.getLogger(StateEventResponseService.class); |
||||
|
||||
/** |
||||
* attemptQueue |
||||
*/ |
||||
private final BlockingQueue<StateEvent> eventQueue = new LinkedBlockingQueue<>(5000); |
||||
|
||||
/** |
||||
* task response worker |
||||
*/ |
||||
private Thread responseWorker; |
||||
|
||||
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper; |
||||
|
||||
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper) { |
||||
if (this.processInstanceMapper == null) { |
||||
this.processInstanceMapper = processInstanceMapper; |
||||
} |
||||
} |
||||
|
||||
@PostConstruct |
||||
public void start() { |
||||
this.responseWorker = new StateEventResponseWorker(); |
||||
this.responseWorker.setName("StateEventResponseWorker"); |
||||
this.responseWorker.start(); |
||||
} |
||||
|
||||
@PreDestroy |
||||
public void stop() { |
||||
this.responseWorker.interrupt(); |
||||
if (!eventQueue.isEmpty()) { |
||||
List<StateEvent> remainEvents = new ArrayList<>(eventQueue.size()); |
||||
eventQueue.drainTo(remainEvents); |
||||
for (StateEvent event : remainEvents) { |
||||
this.persist(event); |
||||
} |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* put task to attemptQueue |
||||
*/ |
||||
public void addResponse(StateEvent stateEvent) { |
||||
try { |
||||
eventQueue.put(stateEvent); |
||||
} catch (InterruptedException e) { |
||||
logger.error("put state event : {} error :{}", stateEvent, e); |
||||
Thread.currentThread().interrupt(); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* task worker thread |
||||
*/ |
||||
class StateEventResponseWorker extends Thread { |
||||
|
||||
@Override |
||||
public void run() { |
||||
|
||||
while (Stopper.isRunning()) { |
||||
try { |
||||
// if not task , blocking here
|
||||
StateEvent stateEvent = eventQueue.take(); |
||||
persist(stateEvent); |
||||
} catch (InterruptedException e) { |
||||
logger.warn("persist task error", e); |
||||
Thread.currentThread().interrupt(); |
||||
} |
||||
} |
||||
logger.info("StateEventResponseWorker stopped"); |
||||
} |
||||
} |
||||
|
||||
private void writeResponse(StateEvent stateEvent, ExecutionStatus status) { |
||||
Channel channel = stateEvent.getChannel(); |
||||
if (channel != null) { |
||||
StateEventResponseCommand command = new StateEventResponseCommand(status.getCode(), stateEvent.getKey()); |
||||
channel.writeAndFlush(command.convert2Command()); |
||||
} |
||||
} |
||||
|
||||
private void persist(StateEvent stateEvent) { |
||||
try { |
||||
if (!this.processInstanceMapper.containsKey(stateEvent.getProcessInstanceId())) { |
||||
writeResponse(stateEvent, ExecutionStatus.FAILURE); |
||||
return; |
||||
} |
||||
|
||||
WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(stateEvent.getProcessInstanceId()); |
||||
workflowExecuteThread.addStateEvent(stateEvent); |
||||
writeResponse(stateEvent, ExecutionStatus.SUCCESS); |
||||
} catch (Exception e) { |
||||
logger.error("persist event queue error:", stateEvent.toString(), e); |
||||
} |
||||
} |
||||
|
||||
public BlockingQueue<StateEvent> getEventQueue() { |
||||
return eventQueue; |
||||
} |
||||
} |
@ -0,0 +1,195 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.enums.StateEvent; |
||||
import org.apache.dolphinscheduler.common.enums.StateEventType; |
||||
import org.apache.dolphinscheduler.common.thread.Stopper; |
||||
import org.apache.dolphinscheduler.common.thread.ThreadUtils; |
||||
import org.apache.dolphinscheduler.common.utils.NetUtils; |
||||
import org.apache.dolphinscheduler.common.utils.StringUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; |
||||
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; |
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
|
||||
import java.util.Map; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.concurrent.ExecutorService; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Service; |
||||
|
||||
import com.google.common.util.concurrent.FutureCallback; |
||||
import com.google.common.util.concurrent.Futures; |
||||
import com.google.common.util.concurrent.ListenableFuture; |
||||
import com.google.common.util.concurrent.ListeningExecutorService; |
||||
import com.google.common.util.concurrent.MoreExecutors; |
||||
|
||||
@Service |
||||
public class EventExecuteService extends Thread { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(EventExecuteService.class); |
||||
|
||||
|
||||
/** |
||||
* dolphinscheduler database interface
|
||||
*/ |
||||
@Autowired |
||||
private ProcessService processService; |
||||
|
||||
@Autowired |
||||
private MasterConfig masterConfig; |
||||
|
||||
private ExecutorService eventExecService; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
private StateEventCallbackService stateEventCallbackService; |
||||
|
||||
|
||||
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps; |
||||
private ConcurrentHashMap<String, WorkflowExecuteThread> eventHandlerMap = new ConcurrentHashMap(); |
||||
ListeningExecutorService listeningExecutorService; |
||||
|
||||
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) { |
||||
|
||||
eventExecService = ThreadUtils.newDaemonFixedThreadExecutor("MasterEventExecution", masterConfig.getMasterExecThreads()); |
||||
|
||||
this.processInstanceExecMaps = processInstanceExecMaps; |
||||
|
||||
listeningExecutorService = MoreExecutors.listeningDecorator(eventExecService); |
||||
this.stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class); |
||||
|
||||
} |
||||
|
||||
@Override |
||||
public synchronized void start() { |
||||
super.setName("EventServiceStarted"); |
||||
super.start(); |
||||
} |
||||
|
||||
public void close() { |
||||
eventExecService.shutdown(); |
||||
logger.info("event service stopped..."); |
||||
} |
||||
|
||||
@Override |
||||
public void run() { |
||||
logger.info("Event service started"); |
||||
while (Stopper.isRunning()) { |
||||
try { |
||||
eventHandler(); |
||||
|
||||
} catch (Exception e) { |
||||
logger.error("Event service thread error", e); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private void eventHandler() { |
||||
for (WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecMaps.values()) { |
||||
if (workflowExecuteThread.eventSize() == 0 |
||||
|| StringUtils.isEmpty(workflowExecuteThread.getKey()) |
||||
|| eventHandlerMap.containsKey(workflowExecuteThread.getKey())) { |
||||
continue; |
||||
} |
||||
int processInstanceId = workflowExecuteThread.getProcessInstance().getId(); |
||||
logger.info("handle process instance : {} events, count:{}", |
||||
processInstanceId, |
||||
workflowExecuteThread.eventSize()); |
||||
logger.info("already exists handler process size:{}", this.eventHandlerMap.size()); |
||||
eventHandlerMap.put(workflowExecuteThread.getKey(), workflowExecuteThread); |
||||
ListenableFuture future = this.listeningExecutorService.submit(workflowExecuteThread); |
||||
FutureCallback futureCallback = new FutureCallback() { |
||||
@Override |
||||
public void onSuccess(Object o) { |
||||
if (workflowExecuteThread.workFlowFinish()) { |
||||
processInstanceExecMaps.remove(processInstanceId); |
||||
notifyProcessChanged(); |
||||
logger.info("process instance {} finished.", processInstanceId); |
||||
} |
||||
if (workflowExecuteThread.getProcessInstance().getId() != processInstanceId) { |
||||
processInstanceExecMaps.remove(processInstanceId); |
||||
processInstanceExecMaps.put(workflowExecuteThread.getProcessInstance().getId(), workflowExecuteThread); |
||||
|
||||
} |
||||
eventHandlerMap.remove(workflowExecuteThread.getKey()); |
||||
} |
||||
|
||||
private void notifyProcessChanged() { |
||||
Map<ProcessInstance, TaskInstance> fatherMaps |
||||
= processService.notifyProcessList(processInstanceId, 0); |
||||
|
||||
for (ProcessInstance processInstance : fatherMaps.keySet()) { |
||||
String address = NetUtils.getAddr(masterConfig.getListenPort()); |
||||
if (processInstance.getHost().equalsIgnoreCase(address)) { |
||||
notifyMyself(processInstance, fatherMaps.get(processInstance)); |
||||
} else { |
||||
notifyProcess(processInstance, fatherMaps.get(processInstance)); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) { |
||||
logger.info("notify process {} task {} state change", processInstance.getId(), taskInstance.getId()); |
||||
if (!processInstanceExecMaps.containsKey(processInstance.getId())) { |
||||
return; |
||||
} |
||||
WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecMaps.get(processInstance.getId()); |
||||
StateEvent stateEvent = new StateEvent(); |
||||
stateEvent.setTaskInstanceId(taskInstance.getId()); |
||||
stateEvent.setType(StateEventType.TASK_STATE_CHANGE); |
||||
stateEvent.setProcessInstanceId(processInstance.getId()); |
||||
stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); |
||||
workflowExecuteThreadNotify.addStateEvent(stateEvent); |
||||
} |
||||
|
||||
private void notifyProcess(ProcessInstance processInstance, TaskInstance taskInstance) { |
||||
String host = processInstance.getHost(); |
||||
if (StringUtils.isEmpty(host)) { |
||||
logger.info("process {} host is empty, cannot notify task {} now.", |
||||
processInstance.getId(), taskInstance.getId()); |
||||
return; |
||||
} |
||||
String address = host.split(":")[0]; |
||||
int port = Integer.parseInt(host.split(":")[1]); |
||||
logger.info("notify process {} task {} state change, host:{}", |
||||
processInstance.getId(), taskInstance.getId(), host); |
||||
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( |
||||
processInstanceId, 0, workflowExecuteThread.getProcessInstance().getState(), processInstance.getId(), taskInstance.getId() |
||||
); |
||||
|
||||
stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); |
||||
} |
||||
|
||||
@Override |
||||
public void onFailure(Throwable throwable) { |
||||
} |
||||
}; |
||||
Futures.addCallback(future, futureCallback, this.listeningExecutorService); |
||||
} |
||||
} |
||||
} |
@ -1,324 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; |
||||
import org.apache.dolphinscheduler.common.enums.TimeoutFlag; |
||||
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.dao.AlertDao; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
import org.apache.dolphinscheduler.service.queue.TaskPriority; |
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; |
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; |
||||
|
||||
import java.util.Date; |
||||
import java.util.concurrent.Callable; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
/** |
||||
* master task exec base class
|
||||
*/ |
||||
public class MasterBaseTaskExecThread implements Callable<Boolean> { |
||||
|
||||
/** |
||||
* logger of MasterBaseTaskExecThread |
||||
*/ |
||||
protected Logger logger = LoggerFactory.getLogger(getClass()); |
||||
|
||||
|
||||
/** |
||||
* process service |
||||
*/ |
||||
protected ProcessService processService; |
||||
|
||||
/** |
||||
* alert database access |
||||
*/ |
||||
protected AlertDao alertDao; |
||||
|
||||
/** |
||||
* process instance |
||||
*/ |
||||
protected ProcessInstance processInstance; |
||||
|
||||
/** |
||||
* task instance |
||||
*/ |
||||
protected TaskInstance taskInstance; |
||||
|
||||
/** |
||||
* whether need cancel |
||||
*/ |
||||
protected boolean cancel; |
||||
|
||||
/** |
||||
* master config |
||||
*/ |
||||
protected MasterConfig masterConfig; |
||||
|
||||
/** |
||||
* taskUpdateQueue |
||||
*/ |
||||
private TaskPriorityQueue taskUpdateQueue; |
||||
|
||||
/** |
||||
* whether need check task time out. |
||||
*/ |
||||
protected boolean checkTimeoutFlag = false; |
||||
|
||||
/** |
||||
* task timeout parameters |
||||
*/ |
||||
protected TaskTimeoutParameter taskTimeoutParameter; |
||||
|
||||
/** |
||||
* constructor of MasterBaseTaskExecThread |
||||
* |
||||
* @param taskInstance task instance |
||||
*/ |
||||
public MasterBaseTaskExecThread(TaskInstance taskInstance) { |
||||
this.processService = SpringApplicationContext.getBean(ProcessService.class); |
||||
this.alertDao = SpringApplicationContext.getBean(AlertDao.class); |
||||
this.cancel = false; |
||||
this.taskInstance = taskInstance; |
||||
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class); |
||||
this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class); |
||||
initTaskParams(); |
||||
} |
||||
|
||||
/** |
||||
* init task ordinary parameters |
||||
*/ |
||||
private void initTaskParams() { |
||||
initTimeoutParams(); |
||||
} |
||||
|
||||
/** |
||||
* init task timeout parameters |
||||
*/ |
||||
private void initTimeoutParams() { |
||||
TaskDefinition taskDefinition = processService.findTaskDefinition(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()); |
||||
boolean timeoutEnable = taskDefinition.getTimeoutFlag() == TimeoutFlag.OPEN; |
||||
taskTimeoutParameter = new TaskTimeoutParameter(timeoutEnable, |
||||
taskDefinition.getTimeoutNotifyStrategy(), |
||||
taskDefinition.getTimeout()); |
||||
if (taskTimeoutParameter.getEnable()) { |
||||
checkTimeoutFlag = true; |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* get task instance |
||||
* |
||||
* @return TaskInstance |
||||
*/ |
||||
public TaskInstance getTaskInstance() { |
||||
return this.taskInstance; |
||||
} |
||||
|
||||
/** |
||||
* kill master base task exec thread |
||||
*/ |
||||
public void kill() { |
||||
this.cancel = true; |
||||
} |
||||
|
||||
/** |
||||
* submit master base task exec thread |
||||
* |
||||
* @return TaskInstance |
||||
*/ |
||||
protected TaskInstance submit() { |
||||
Integer commitRetryTimes = masterConfig.getMasterTaskCommitRetryTimes(); |
||||
Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval(); |
||||
|
||||
int retryTimes = 1; |
||||
boolean submitDB = false; |
||||
boolean submitTask = false; |
||||
TaskInstance task = null; |
||||
while (retryTimes <= commitRetryTimes) { |
||||
try { |
||||
if (!submitDB) { |
||||
// submit task to db
|
||||
task = processService.submitTask(taskInstance); |
||||
if (task != null && task.getId() != 0) { |
||||
submitDB = true; |
||||
} |
||||
} |
||||
if (submitDB && !submitTask) { |
||||
// dispatch task
|
||||
submitTask = dispatchTask(task); |
||||
} |
||||
if (submitDB && submitTask) { |
||||
return task; |
||||
} |
||||
if (!submitDB) { |
||||
logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes); |
||||
} else if (!submitTask) { |
||||
logger.error("task commit failed , taskId {} has already retry {} times, please check", taskInstance.getId(), retryTimes); |
||||
} |
||||
Thread.sleep(commitRetryInterval); |
||||
} catch (Exception e) { |
||||
logger.error("task commit to mysql and dispatcht task failed", e); |
||||
} |
||||
retryTimes += 1; |
||||
} |
||||
return task; |
||||
} |
||||
|
||||
/** |
||||
* dispatch task |
||||
* |
||||
* @param taskInstance taskInstance |
||||
* @return whether submit task success |
||||
*/ |
||||
public Boolean dispatchTask(TaskInstance taskInstance) { |
||||
|
||||
try { |
||||
if (taskInstance.isConditionsTask() |
||||
|| taskInstance.isDependTask() |
||||
|| taskInstance.isSubProcess()) { |
||||
return true; |
||||
} |
||||
if (taskInstance.getState().typeIsFinished()) { |
||||
logger.info(String.format("submit task , but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString())); |
||||
return true; |
||||
} |
||||
// task cannot be submitted because its execution state is RUNNING or DELAY.
|
||||
if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION |
||||
|| taskInstance.getState() == ExecutionStatus.DELAY_EXECUTION) { |
||||
logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName()); |
||||
return true; |
||||
} |
||||
logger.info("task ready to submit: {}", taskInstance); |
||||
|
||||
/** |
||||
* taskPriority |
||||
*/ |
||||
TaskPriority taskPriority = buildTaskPriority(processInstance.getProcessInstancePriority().getCode(), |
||||
processInstance.getId(), |
||||
taskInstance.getProcessInstancePriority().getCode(), |
||||
taskInstance.getId(), |
||||
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP); |
||||
taskUpdateQueue.put(taskPriority); |
||||
logger.info(String.format("master submit success, task : %s", taskInstance.getName())); |
||||
return true; |
||||
} catch (Exception e) { |
||||
logger.error("submit task Exception: ", e); |
||||
logger.error("task error : %s", JSONUtils.toJsonString(taskInstance)); |
||||
return false; |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* buildTaskPriority |
||||
* |
||||
* @param processInstancePriority processInstancePriority |
||||
* @param processInstanceId processInstanceId |
||||
* @param taskInstancePriority taskInstancePriority |
||||
* @param taskInstanceId taskInstanceId |
||||
* @param workerGroup workerGroup |
||||
* @return TaskPriority |
||||
*/ |
||||
private TaskPriority buildTaskPriority(int processInstancePriority, |
||||
int processInstanceId, |
||||
int taskInstancePriority, |
||||
int taskInstanceId, |
||||
String workerGroup) { |
||||
return new TaskPriority(processInstancePriority, processInstanceId, |
||||
taskInstancePriority, taskInstanceId, workerGroup); |
||||
} |
||||
|
||||
/** |
||||
* submit wait complete |
||||
* |
||||
* @return true |
||||
*/ |
||||
protected Boolean submitWaitComplete() { |
||||
return true; |
||||
} |
||||
|
||||
/** |
||||
* call |
||||
* |
||||
* @return boolean |
||||
*/ |
||||
@Override |
||||
public Boolean call() { |
||||
this.processInstance = processService.findProcessInstanceById(taskInstance.getProcessInstanceId()); |
||||
return submitWaitComplete(); |
||||
} |
||||
|
||||
/** |
||||
* alert time out |
||||
*/ |
||||
protected boolean alertTimeout() { |
||||
if (TaskTimeoutStrategy.FAILED == this.taskTimeoutParameter.getStrategy()) { |
||||
return true; |
||||
} |
||||
logger.warn("process id:{} process name:{} task id: {},name:{} execution time out", |
||||
processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName()); |
||||
// send warn mail
|
||||
alertDao.sendTaskTimeoutAlert(processInstance.getWarningGroupId(), processInstance.getId(), processInstance.getName(), |
||||
taskInstance.getId(), taskInstance.getName()); |
||||
return true; |
||||
} |
||||
|
||||
/** |
||||
* handle time out for time out strategy warn&&failed |
||||
*/ |
||||
protected void handleTimeoutFailed() { |
||||
if (TaskTimeoutStrategy.WARN == this.taskTimeoutParameter.getStrategy()) { |
||||
return; |
||||
} |
||||
logger.info("process id:{} name:{} task id:{} name:{} cancel because of timeout.", |
||||
processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName()); |
||||
this.cancel = true; |
||||
} |
||||
|
||||
/** |
||||
* check task remain time valid |
||||
*/ |
||||
protected boolean checkTaskTimeout() { |
||||
if (!checkTimeoutFlag || taskInstance.getStartTime() == null) { |
||||
return false; |
||||
} |
||||
long remainTime = getRemainTime(taskTimeoutParameter.getInterval() * 60L); |
||||
return remainTime <= 0; |
||||
} |
||||
|
||||
/** |
||||
* get remain time |
||||
* |
||||
* @return remain time |
||||
*/ |
||||
protected long getRemainTime(long timeoutSeconds) { |
||||
Date startTime = taskInstance.getStartTime(); |
||||
long usedTime = (System.currentTimeMillis() - startTime.getTime()) / 1000; |
||||
return timeoutSeconds - usedTime; |
||||
} |
||||
} |
@ -1,230 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.thread.Stopper; |
||||
import org.apache.dolphinscheduler.common.utils.CollectionUtils; |
||||
import org.apache.dolphinscheduler.common.utils.StringUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; |
||||
import org.apache.dolphinscheduler.remote.utils.Host; |
||||
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; |
||||
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; |
||||
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; |
||||
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; |
||||
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
import org.apache.dolphinscheduler.service.registry.RegistryClient; |
||||
|
||||
import java.util.Date; |
||||
import java.util.Set; |
||||
|
||||
/** |
||||
* master task exec thread |
||||
*/ |
||||
public class MasterTaskExecThread extends MasterBaseTaskExecThread { |
||||
|
||||
/** |
||||
* taskInstance state manager |
||||
*/ |
||||
private TaskInstanceCacheManager taskInstanceCacheManager; |
||||
|
||||
/** |
||||
* netty executor manager |
||||
*/ |
||||
private NettyExecutorManager nettyExecutorManager; |
||||
|
||||
|
||||
/** |
||||
* zookeeper register center |
||||
*/ |
||||
private RegistryClient registryClient; |
||||
|
||||
/** |
||||
* constructor of MasterTaskExecThread |
||||
* |
||||
* @param taskInstance task instance |
||||
*/ |
||||
public MasterTaskExecThread(TaskInstance taskInstance) { |
||||
super(taskInstance); |
||||
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); |
||||
this.nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class); |
||||
this.registryClient = RegistryClient.getInstance(); |
||||
} |
||||
|
||||
/** |
||||
* get task instance |
||||
* |
||||
* @return TaskInstance |
||||
*/ |
||||
@Override |
||||
public TaskInstance getTaskInstance() { |
||||
return this.taskInstance; |
||||
} |
||||
|
||||
/** |
||||
* whether already Killed,default false |
||||
*/ |
||||
private boolean alreadyKilled = false; |
||||
|
||||
/** |
||||
* submit task instance and wait complete |
||||
* |
||||
* @return true is task quit is true |
||||
*/ |
||||
@Override |
||||
public Boolean submitWaitComplete() { |
||||
Boolean result = false; |
||||
this.taskInstance = submit(); |
||||
if (this.taskInstance == null) { |
||||
logger.error("submit task instance to mysql and queue failed , please check and fix it"); |
||||
return result; |
||||
} |
||||
if (!this.taskInstance.getState().typeIsFinished()) { |
||||
result = waitTaskQuit(); |
||||
} |
||||
taskInstance.setEndTime(new Date()); |
||||
processService.updateTaskInstance(taskInstance); |
||||
logger.info("task :{} id:{}, process id:{}, exec thread completed ", |
||||
this.taskInstance.getName(), taskInstance.getId(), processInstance.getId()); |
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* polling db |
||||
* <p> |
||||
* wait task quit |
||||
* |
||||
* @return true if task quit success |
||||
*/ |
||||
public Boolean waitTaskQuit() { |
||||
// query new state
|
||||
taskInstance = processService.findTaskInstanceById(taskInstance.getId()); |
||||
logger.info("wait task: process id: {}, task id:{}, task name:{} complete", |
||||
this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName()); |
||||
|
||||
while (Stopper.isRunning()) { |
||||
try { |
||||
if (this.processInstance == null) { |
||||
logger.error("process instance not exists , master task exec thread exit"); |
||||
return true; |
||||
} |
||||
// task instance add queue , waiting worker to kill
|
||||
if (this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP) { |
||||
cancelTaskInstance(); |
||||
} |
||||
if (processInstance.getState() == ExecutionStatus.READY_PAUSE) { |
||||
pauseTask(); |
||||
} |
||||
// task instance finished
|
||||
if (taskInstance.getState().typeIsFinished()) { |
||||
// if task is final result , then remove taskInstance from cache
|
||||
taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId()); |
||||
break; |
||||
} |
||||
if (checkTaskTimeout()) { |
||||
this.checkTimeoutFlag = !alertTimeout(); |
||||
} |
||||
// updateProcessInstance task instance
|
||||
//issue#5539 Check status of taskInstance from cache
|
||||
taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId()); |
||||
processInstance = processService.findProcessInstanceById(processInstance.getId()); |
||||
Thread.sleep(Constants.SLEEP_TIME_MILLIS); |
||||
} catch (Exception e) { |
||||
logger.error("exception", e); |
||||
if (processInstance != null) { |
||||
logger.error("wait task quit failed, instance id:{}, task id:{}", |
||||
processInstance.getId(), taskInstance.getId()); |
||||
} |
||||
} |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
/** |
||||
* pause task if task have not been dispatched to worker, do not dispatch anymore. |
||||
*/ |
||||
public void pauseTask() { |
||||
taskInstance = processService.findTaskInstanceById(taskInstance.getId()); |
||||
if (taskInstance == null) { |
||||
return; |
||||
} |
||||
if (StringUtils.isBlank(taskInstance.getHost())) { |
||||
taskInstance.setState(ExecutionStatus.PAUSE); |
||||
taskInstance.setEndTime(new Date()); |
||||
processService.updateTaskInstance(taskInstance); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* task instance add queue , waiting worker to kill |
||||
*/ |
||||
private void cancelTaskInstance() throws Exception { |
||||
if (alreadyKilled) { |
||||
return; |
||||
} |
||||
alreadyKilled = true; |
||||
taskInstance = processService.findTaskInstanceById(taskInstance.getId()); |
||||
if (StringUtils.isBlank(taskInstance.getHost())) { |
||||
taskInstance.setState(ExecutionStatus.KILL); |
||||
taskInstance.setEndTime(new Date()); |
||||
processService.updateTaskInstance(taskInstance); |
||||
return; |
||||
} |
||||
|
||||
TaskKillRequestCommand killCommand = new TaskKillRequestCommand(); |
||||
killCommand.setTaskInstanceId(taskInstance.getId()); |
||||
|
||||
ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER); |
||||
|
||||
Host host = Host.of(taskInstance.getHost()); |
||||
executionContext.setHost(host); |
||||
|
||||
nettyExecutorManager.executeDirectly(executionContext); |
||||
|
||||
logger.info("master kill taskInstance name :{} taskInstance id:{}", |
||||
taskInstance.getName(), taskInstance.getId()); |
||||
} |
||||
|
||||
/** |
||||
* whether exists valid worker group |
||||
* |
||||
* @param taskInstanceWorkerGroup taskInstanceWorkerGroup |
||||
* @return whether exists |
||||
*/ |
||||
public Boolean existsValidWorkerGroup(String taskInstanceWorkerGroup) { |
||||
Set<String> workerGroups = registryClient.getWorkerGroupDirectly(); |
||||
// not worker group
|
||||
if (CollectionUtils.isEmpty(workerGroups)) { |
||||
return false; |
||||
} |
||||
|
||||
// has worker group , but not taskInstance assigned worker group
|
||||
if (!workerGroups.contains(taskInstanceWorkerGroup)) { |
||||
return false; |
||||
} |
||||
Set<String> workers = registryClient.getWorkerGroupNodesDirectly(taskInstanceWorkerGroup); |
||||
if (CollectionUtils.isEmpty(workers)) { |
||||
return false; |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,154 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.enums.StateEvent; |
||||
import org.apache.dolphinscheduler.common.enums.StateEventType; |
||||
import org.apache.dolphinscheduler.common.enums.TimeoutFlag; |
||||
import org.apache.dolphinscheduler.common.thread.Stopper; |
||||
import org.apache.dolphinscheduler.common.utils.DateUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
|
||||
import org.apache.hadoop.util.ThreadUtil; |
||||
|
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
/** |
||||
* 1. timeout check wheel |
||||
* 2. dependent task check wheel |
||||
*/ |
||||
public class StateWheelExecuteThread extends Thread { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class); |
||||
|
||||
ConcurrentHashMap<Integer, ProcessInstance> processInstanceCheckList; |
||||
ConcurrentHashMap<Integer, TaskInstance> taskInstanceCheckList; |
||||
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps; |
||||
|
||||
private int stateCheckIntervalSecs; |
||||
|
||||
public StateWheelExecuteThread(ConcurrentHashMap<Integer, ProcessInstance> processInstances, |
||||
ConcurrentHashMap<Integer, TaskInstance> taskInstances, |
||||
ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps, |
||||
int stateCheckIntervalSecs) { |
||||
this.processInstanceCheckList = processInstances; |
||||
this.taskInstanceCheckList = taskInstances; |
||||
this.processInstanceExecMaps = processInstanceExecMaps; |
||||
this.stateCheckIntervalSecs = stateCheckIntervalSecs; |
||||
} |
||||
|
||||
@Override |
||||
public void run() { |
||||
|
||||
logger.info("state wheel thread start"); |
||||
while (Stopper.isRunning()) { |
||||
try { |
||||
checkProcess(); |
||||
checkTask(); |
||||
} catch (Exception e) { |
||||
logger.error("state wheel thread check error:", e); |
||||
} |
||||
ThreadUtil.sleepAtLeastIgnoreInterrupts(stateCheckIntervalSecs); |
||||
} |
||||
} |
||||
|
||||
public boolean addProcess(ProcessInstance processInstance) { |
||||
this.processInstanceCheckList.put(processInstance.getId(), processInstance); |
||||
return true; |
||||
} |
||||
|
||||
public boolean addTask(TaskInstance taskInstance) { |
||||
this.taskInstanceCheckList.put(taskInstance.getId(), taskInstance); |
||||
return true; |
||||
} |
||||
|
||||
private void checkTask() { |
||||
if (taskInstanceCheckList.isEmpty()) { |
||||
return; |
||||
} |
||||
|
||||
for (TaskInstance taskInstance : this.taskInstanceCheckList.values()) { |
||||
if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) { |
||||
long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); |
||||
if (0 <= timeRemain && processTimeout(taskInstance)) { |
||||
taskInstanceCheckList.remove(taskInstance.getId()); |
||||
return; |
||||
} |
||||
} |
||||
if (taskInstance.isSubProcess() || taskInstance.isDependTask()) { |
||||
processDependCheck(taskInstance); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private void checkProcess() { |
||||
if (processInstanceCheckList.isEmpty()) { |
||||
return; |
||||
} |
||||
for (ProcessInstance processInstance : this.processInstanceCheckList.values()) { |
||||
|
||||
long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); |
||||
if (0 <= timeRemain && processTimeout(processInstance)) { |
||||
processInstanceCheckList.remove(processInstance.getId()); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private void putEvent(StateEvent stateEvent) { |
||||
|
||||
if (!processInstanceExecMaps.containsKey(stateEvent.getProcessInstanceId())) { |
||||
return; |
||||
} |
||||
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecMaps.get(stateEvent.getProcessInstanceId()); |
||||
workflowExecuteThread.addStateEvent(stateEvent); |
||||
} |
||||
|
||||
private boolean processDependCheck(TaskInstance taskInstance) { |
||||
StateEvent stateEvent = new StateEvent(); |
||||
stateEvent.setType(StateEventType.TASK_STATE_CHANGE); |
||||
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId()); |
||||
stateEvent.setTaskInstanceId(taskInstance.getId()); |
||||
stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); |
||||
putEvent(stateEvent); |
||||
return true; |
||||
} |
||||
|
||||
private boolean processTimeout(TaskInstance taskInstance) { |
||||
StateEvent stateEvent = new StateEvent(); |
||||
stateEvent.setType(StateEventType.TASK_TIMEOUT); |
||||
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId()); |
||||
stateEvent.setTaskInstanceId(taskInstance.getId()); |
||||
putEvent(stateEvent); |
||||
return true; |
||||
} |
||||
|
||||
private boolean processTimeout(ProcessInstance processInstance) { |
||||
StateEvent stateEvent = new StateEvent(); |
||||
stateEvent.setType(StateEventType.PROCESS_TIMEOUT); |
||||
stateEvent.setProcessInstanceId(processInstance.getId()); |
||||
putEvent(stateEvent); |
||||
return true; |
||||
} |
||||
|
||||
} |
@ -1,181 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.server.master.runner; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.thread.Stopper; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
|
||||
import java.util.Date; |
||||
|
||||
/** |
||||
* subflow task exec thread |
||||
*/ |
||||
public class SubProcessTaskExecThread extends MasterBaseTaskExecThread { |
||||
|
||||
/** |
||||
* sub process instance |
||||
*/ |
||||
private ProcessInstance subProcessInstance; |
||||
|
||||
/** |
||||
* sub process task exec thread |
||||
* @param taskInstance task instance |
||||
*/ |
||||
public SubProcessTaskExecThread(TaskInstance taskInstance){ |
||||
super(taskInstance); |
||||
} |
||||
|
||||
@Override |
||||
public Boolean submitWaitComplete() { |
||||
|
||||
Boolean result = false; |
||||
try{ |
||||
// submit task instance
|
||||
this.taskInstance = submit(); |
||||
|
||||
if(taskInstance == null){ |
||||
logger.error("sub work flow submit task instance to mysql and queue failed , please check and fix it"); |
||||
return result; |
||||
} |
||||
setTaskInstanceState(); |
||||
waitTaskQuit(); |
||||
subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId()); |
||||
|
||||
// at the end of the subflow , the task state is changed to the subflow state
|
||||
if(subProcessInstance != null){ |
||||
if(subProcessInstance.getState() == ExecutionStatus.STOP){ |
||||
this.taskInstance.setState(ExecutionStatus.KILL); |
||||
}else{ |
||||
this.taskInstance.setState(subProcessInstance.getState()); |
||||
} |
||||
} |
||||
taskInstance.setEndTime(new Date()); |
||||
processService.updateTaskInstance(taskInstance); |
||||
logger.info("subflow task :{} id:{}, process id:{}, exec thread completed ", |
||||
this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() ); |
||||
result = true; |
||||
|
||||
}catch (Exception e){ |
||||
logger.error("exception: ",e); |
||||
if (null != taskInstance) { |
||||
logger.error("wait task quit failed, instance id:{}, task id:{}", |
||||
processInstance.getId(), taskInstance.getId()); |
||||
} |
||||
} |
||||
return result; |
||||
} |
||||
|
||||
|
||||
/** |
||||
* set task instance state |
||||
* @return |
||||
*/ |
||||
private boolean setTaskInstanceState(){ |
||||
subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId()); |
||||
if(subProcessInstance == null || taskInstance.getState().typeIsFinished()){ |
||||
return false; |
||||
} |
||||
|
||||
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); |
||||
taskInstance.setStartTime(new Date()); |
||||
processService.updateTaskInstance(taskInstance); |
||||
return true; |
||||
} |
||||
|
||||
/** |
||||
* updateProcessInstance parent state |
||||
*/ |
||||
private void updateParentProcessState(){ |
||||
ProcessInstance parentProcessInstance = processService.findProcessInstanceById(this.processInstance.getId()); |
||||
|
||||
if(parentProcessInstance == null){ |
||||
logger.error("parent work flow instance is null , please check it! work flow id {}", processInstance.getId()); |
||||
return; |
||||
} |
||||
this.processInstance.setState(parentProcessInstance.getState()); |
||||
} |
||||
|
||||
/** |
||||
* wait task quit |
||||
* @throws InterruptedException |
||||
*/ |
||||
private void waitTaskQuit() throws InterruptedException { |
||||
|
||||
logger.info("wait sub work flow: {} complete", this.taskInstance.getName()); |
||||
|
||||
if (taskInstance.getState().typeIsFinished()) { |
||||
logger.info("sub work flow task {} already complete. task state:{}, parent work flow instance state:{}", |
||||
this.taskInstance.getName(), |
||||
this.taskInstance.getState(), |
||||
this.processInstance.getState()); |
||||
return; |
||||
} |
||||
while (Stopper.isRunning()) { |
||||
// waiting for subflow process instance establishment
|
||||
if (subProcessInstance == null) { |
||||
Thread.sleep(Constants.SLEEP_TIME_MILLIS); |
||||
if(!setTaskInstanceState()){ |
||||
continue; |
||||
} |
||||
} |
||||
subProcessInstance = processService.findProcessInstanceById(subProcessInstance.getId()); |
||||
if (checkTaskTimeout()) { |
||||
this.checkTimeoutFlag = !alertTimeout(); |
||||
handleTimeoutFailed(); |
||||
} |
||||
updateParentProcessState(); |
||||
if (subProcessInstance.getState().typeIsFinished()){ |
||||
break; |
||||
} |
||||
if(this.processInstance.getState() == ExecutionStatus.READY_PAUSE){ |
||||
// parent process "ready to pause" , child process "pause"
|
||||
pauseSubProcess(); |
||||
}else if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){ |
||||
// parent Process "Ready to Cancel" , subflow "Cancel"
|
||||
stopSubProcess(); |
||||
} |
||||
Thread.sleep(Constants.SLEEP_TIME_MILLIS); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* stop sub process |
||||
*/ |
||||
private void stopSubProcess() { |
||||
if(subProcessInstance.getState() == ExecutionStatus.STOP || |
||||
subProcessInstance.getState() == ExecutionStatus.READY_STOP){ |
||||
return; |
||||
} |
||||
subProcessInstance.setState(ExecutionStatus.READY_STOP); |
||||
processService.updateProcessInstance(subProcessInstance); |
||||
} |
||||
|
||||
/** |
||||
* pause sub process |
||||
*/ |
||||
private void pauseSubProcess() { |
||||
if(subProcessInstance.getState() == ExecutionStatus.PAUSE || |
||||
subProcessInstance.getState() == ExecutionStatus.READY_PAUSE){ |
||||
return; |
||||
} |
||||
subProcessInstance.setState(ExecutionStatus.READY_PAUSE); |
||||
processService.updateProcessInstance(subProcessInstance); |
||||
} |
||||
} |
@ -0,0 +1,112 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
public abstract class BaseTaskProcessor implements ITaskProcessor { |
||||
|
||||
protected Logger logger = LoggerFactory.getLogger(getClass()); |
||||
|
||||
protected boolean killed = false; |
||||
|
||||
protected boolean paused = false; |
||||
|
||||
protected boolean timeout = false; |
||||
|
||||
protected TaskInstance taskInstance = null; |
||||
|
||||
protected ProcessInstance processInstance; |
||||
|
||||
/** |
||||
* pause task, common tasks donot need this. |
||||
* |
||||
* @return |
||||
*/ |
||||
protected abstract boolean pauseTask(); |
||||
|
||||
/** |
||||
* kill task, all tasks need to realize this function |
||||
* |
||||
* @return |
||||
*/ |
||||
protected abstract boolean killTask(); |
||||
|
||||
/** |
||||
* task timeout process |
||||
* @return |
||||
*/ |
||||
protected abstract boolean taskTimeout(); |
||||
|
||||
@Override |
||||
public void run() { |
||||
} |
||||
|
||||
@Override |
||||
public boolean action(TaskAction taskAction) { |
||||
|
||||
switch (taskAction) { |
||||
case STOP: |
||||
return stop(); |
||||
case PAUSE: |
||||
return pause(); |
||||
case TIMEOUT: |
||||
return timeout(); |
||||
default: |
||||
logger.error("unknown task action: {}", taskAction.toString()); |
||||
|
||||
} |
||||
return false; |
||||
} |
||||
|
||||
protected boolean timeout() { |
||||
if (timeout) { |
||||
return true; |
||||
} |
||||
timeout = taskTimeout(); |
||||
return timeout; |
||||
} |
||||
|
||||
/** |
||||
* @return |
||||
*/ |
||||
protected boolean pause() { |
||||
if (paused) { |
||||
return true; |
||||
} |
||||
paused = pauseTask(); |
||||
return paused; |
||||
} |
||||
|
||||
protected boolean stop() { |
||||
if (killed) { |
||||
return true; |
||||
} |
||||
killed = killTask(); |
||||
return killed; |
||||
} |
||||
|
||||
@Override |
||||
public String getType() { |
||||
return null; |
||||
} |
||||
} |
@ -0,0 +1,33 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
|
||||
public class CommonTaskProcessFactory implements ITaskProcessFactory { |
||||
@Override |
||||
public String type() { |
||||
return Constants.COMMON_TASK_TYPE; |
||||
|
||||
} |
||||
|
||||
@Override |
||||
public ITaskProcessor create() { |
||||
return new CommonTaskProcessor(); |
||||
} |
||||
} |
@ -0,0 +1,179 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; |
||||
import org.apache.dolphinscheduler.remote.utils.Host; |
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig; |
||||
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; |
||||
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; |
||||
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; |
||||
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
import org.apache.dolphinscheduler.service.queue.TaskPriority; |
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; |
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; |
||||
|
||||
import org.apache.logging.log4j.util.Strings; |
||||
|
||||
import java.util.Date; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
||||
/** |
||||
* common task processor |
||||
*/ |
||||
public class CommonTaskProcessor extends BaseTaskProcessor { |
||||
|
||||
@Autowired |
||||
private TaskPriorityQueue taskUpdateQueue; |
||||
|
||||
@Autowired |
||||
MasterConfig masterConfig; |
||||
|
||||
@Autowired |
||||
NettyExecutorManager nettyExecutorManager; |
||||
|
||||
/** |
||||
* logger of MasterBaseTaskExecThread |
||||
*/ |
||||
protected Logger logger = LoggerFactory.getLogger(getClass()); |
||||
|
||||
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); |
||||
|
||||
@Override |
||||
public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) { |
||||
this.processInstance = processInstance; |
||||
this.taskInstance = processService.submitTask(task, maxRetryTimes, commitInterval); |
||||
|
||||
if (this.taskInstance == null) { |
||||
return false; |
||||
} |
||||
dispatchTask(taskInstance, processInstance); |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public ExecutionStatus taskState() { |
||||
return this.taskInstance.getState(); |
||||
} |
||||
|
||||
@Override |
||||
public void run() { |
||||
} |
||||
|
||||
@Override |
||||
protected boolean taskTimeout() { |
||||
return true; |
||||
} |
||||
|
||||
/** |
||||
* common task cannot be paused |
||||
* |
||||
* @return |
||||
*/ |
||||
@Override |
||||
protected boolean pauseTask() { |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public String getType() { |
||||
return Constants.COMMON_TASK_TYPE; |
||||
} |
||||
|
||||
private boolean dispatchTask(TaskInstance taskInstance, ProcessInstance processInstance) { |
||||
|
||||
try { |
||||
if (taskUpdateQueue == null) { |
||||
this.initQueue(); |
||||
} |
||||
if (taskInstance.getState().typeIsFinished()) { |
||||
logger.info(String.format("submit task , but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString())); |
||||
return true; |
||||
} |
||||
// task cannot be submitted because its execution state is RUNNING or DELAY.
|
||||
if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION |
||||
|| taskInstance.getState() == ExecutionStatus.DELAY_EXECUTION) { |
||||
logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName()); |
||||
return true; |
||||
} |
||||
logger.info("task ready to submit: {}", taskInstance); |
||||
|
||||
TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(), |
||||
processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(), |
||||
taskInstance.getId(), org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP); |
||||
taskUpdateQueue.put(taskPriority); |
||||
logger.info(String.format("master submit success, task : %s", taskInstance.getName())); |
||||
return true; |
||||
} catch (Exception e) { |
||||
logger.error("submit task Exception: ", e); |
||||
logger.error("task error : %s", JSONUtils.toJsonString(taskInstance)); |
||||
return false; |
||||
} |
||||
} |
||||
|
||||
public void initQueue() { |
||||
this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class); |
||||
} |
||||
|
||||
@Override |
||||
public boolean killTask() { |
||||
|
||||
try { |
||||
taskInstance = processService.findTaskInstanceById(taskInstance.getId()); |
||||
if (taskInstance == null) { |
||||
return true; |
||||
} |
||||
if (taskInstance.getState().typeIsFinished()) { |
||||
return true; |
||||
} |
||||
if (Strings.isBlank(taskInstance.getHost())) { |
||||
taskInstance.setState(ExecutionStatus.KILL); |
||||
taskInstance.setEndTime(new Date()); |
||||
processService.updateTaskInstance(taskInstance); |
||||
return true; |
||||
} |
||||
|
||||
TaskKillRequestCommand killCommand = new TaskKillRequestCommand(); |
||||
killCommand.setTaskInstanceId(taskInstance.getId()); |
||||
|
||||
ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER); |
||||
|
||||
Host host = Host.of(taskInstance.getHost()); |
||||
executionContext.setHost(host); |
||||
|
||||
nettyExecutorManager.executeDirectly(executionContext); |
||||
} catch (ExecuteException e) { |
||||
logger.error("kill task error:", e); |
||||
return false; |
||||
} |
||||
|
||||
logger.info("master kill taskInstance name :{} taskInstance id:{}", |
||||
taskInstance.getName(), taskInstance.getId()); |
||||
return true; |
||||
} |
||||
} |
@ -0,0 +1,32 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.TaskType; |
||||
|
||||
public class ConditionTaskProcessFactory implements ITaskProcessFactory { |
||||
@Override |
||||
public String type() { |
||||
return TaskType.CONDITIONS.getDesc(); |
||||
} |
||||
|
||||
@Override |
||||
public ITaskProcessor create() { |
||||
return new ConditionTaskProcessor(); |
||||
} |
||||
} |
@ -0,0 +1,33 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.TaskType; |
||||
|
||||
public class DependentTaskProcessFactory implements ITaskProcessFactory { |
||||
|
||||
@Override |
||||
public String type() { |
||||
return TaskType.DEPENDENT.getDesc(); |
||||
} |
||||
|
||||
@Override |
||||
public ITaskProcessor create() { |
||||
return new DependentTaskProcessor(); |
||||
} |
||||
} |
@ -0,0 +1,25 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
public interface ITaskProcessFactory { |
||||
|
||||
String type(); |
||||
|
||||
ITaskProcessor create(); |
||||
} |
@ -0,0 +1,39 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
|
||||
/** |
||||
* interface of task processor in master |
||||
*/ |
||||
public interface ITaskProcessor { |
||||
|
||||
void run(); |
||||
|
||||
boolean action(TaskAction taskAction); |
||||
|
||||
String getType(); |
||||
|
||||
boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval); |
||||
|
||||
ExecutionStatus taskState(); |
||||
|
||||
} |
@ -0,0 +1,32 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.TaskType; |
||||
|
||||
public class SubTaskProcessFactory implements ITaskProcessFactory { |
||||
@Override |
||||
public String type() { |
||||
return TaskType.SUB_PROCESS.getDesc(); |
||||
} |
||||
|
||||
@Override |
||||
public ITaskProcessor create() { |
||||
return new SubTaskProcessor(); |
||||
} |
||||
} |
@ -0,0 +1,171 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; |
||||
import org.apache.dolphinscheduler.common.enums.TaskType; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
|
||||
import java.util.Date; |
||||
import java.util.concurrent.locks.Lock; |
||||
import java.util.concurrent.locks.ReentrantLock; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
public class SubTaskProcessor extends BaseTaskProcessor { |
||||
|
||||
private ProcessInstance processInstance; |
||||
|
||||
private ProcessInstance subProcessInstance = null; |
||||
private TaskDefinition taskDefinition; |
||||
|
||||
/** |
||||
* run lock |
||||
*/ |
||||
private final Lock runLock = new ReentrantLock(); |
||||
|
||||
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); |
||||
|
||||
@Override |
||||
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { |
||||
this.processInstance = processInstance; |
||||
taskDefinition = processService.findTaskDefinition( |
||||
task.getTaskCode(), task.getTaskDefinitionVersion() |
||||
); |
||||
this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval); |
||||
|
||||
if (this.taskInstance == null) { |
||||
return false; |
||||
} |
||||
|
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public ExecutionStatus taskState() { |
||||
return this.taskInstance.getState(); |
||||
} |
||||
|
||||
@Override |
||||
public void run() { |
||||
try { |
||||
this.runLock.lock(); |
||||
if (setSubWorkFlow()) { |
||||
updateTaskState(); |
||||
} |
||||
} catch (Exception e) { |
||||
logger.error("work flow {} sub task {} exceptions", |
||||
this.processInstance.getId(), |
||||
this.taskInstance.getId(), |
||||
e); |
||||
} finally { |
||||
this.runLock.unlock(); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
protected boolean taskTimeout() { |
||||
TaskTimeoutStrategy taskTimeoutStrategy = |
||||
taskDefinition.getTimeoutNotifyStrategy(); |
||||
if (TaskTimeoutStrategy.FAILED != taskTimeoutStrategy |
||||
&& TaskTimeoutStrategy.WARNFAILED != taskTimeoutStrategy) { |
||||
return true; |
||||
} |
||||
logger.info("sub process task {} timeout, strategy {} ", |
||||
taskInstance.getId(), taskTimeoutStrategy.getDescp()); |
||||
killTask(); |
||||
return true; |
||||
} |
||||
|
||||
private void updateTaskState() { |
||||
subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId()); |
||||
logger.info("work flow {} task {}, sub work flow: {} state: {}", |
||||
this.processInstance.getId(), |
||||
this.taskInstance.getId(), |
||||
subProcessInstance.getId(), |
||||
subProcessInstance.getState().getDescp()); |
||||
if (subProcessInstance != null && subProcessInstance.getState().typeIsFinished()) { |
||||
taskInstance.setState(subProcessInstance.getState()); |
||||
taskInstance.setEndTime(new Date()); |
||||
processService.saveTaskInstance(taskInstance); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
protected boolean pauseTask() { |
||||
pauseSubWorkFlow(); |
||||
return true; |
||||
} |
||||
|
||||
private boolean pauseSubWorkFlow() { |
||||
ProcessInstance subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId()); |
||||
if (subProcessInstance == null || taskInstance.getState().typeIsFinished()) { |
||||
return false; |
||||
} |
||||
subProcessInstance.setState(ExecutionStatus.READY_PAUSE); |
||||
processService.updateProcessInstance(subProcessInstance); |
||||
//TODO...
|
||||
// send event to sub process master
|
||||
return true; |
||||
} |
||||
|
||||
private boolean setSubWorkFlow() { |
||||
logger.info("set work flow {} task {} running", |
||||
this.processInstance.getId(), |
||||
this.taskInstance.getId()); |
||||
if (this.subProcessInstance != null) { |
||||
return true; |
||||
} |
||||
subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId()); |
||||
if (subProcessInstance == null || taskInstance.getState().typeIsFinished()) { |
||||
return false; |
||||
} |
||||
|
||||
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); |
||||
taskInstance.setStartTime(new Date()); |
||||
processService.updateTaskInstance(taskInstance); |
||||
logger.info("set sub work flow {} task {} state: {}", |
||||
processInstance.getId(), |
||||
taskInstance.getId(), |
||||
taskInstance.getState()); |
||||
return true; |
||||
|
||||
} |
||||
|
||||
@Override |
||||
protected boolean killTask() { |
||||
ProcessInstance subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId()); |
||||
if (subProcessInstance == null || taskInstance.getState().typeIsFinished()) { |
||||
return false; |
||||
} |
||||
subProcessInstance.setState(ExecutionStatus.READY_STOP); |
||||
processService.updateProcessInstance(subProcessInstance); |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public String getType() { |
||||
return TaskType.SUB_PROCESS.getDesc(); |
||||
} |
||||
} |
@ -0,0 +1,33 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.TaskType; |
||||
|
||||
public class SwitchTaskProcessFactory implements ITaskProcessFactory { |
||||
|
||||
@Override |
||||
public String type() { |
||||
return TaskType.SWITCH.getDesc(); |
||||
} |
||||
|
||||
@Override |
||||
public ITaskProcessor create() { |
||||
return new SwitchTaskProcessor(); |
||||
} |
||||
} |
@ -0,0 +1,220 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.DependResult; |
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.enums.TaskType; |
||||
import org.apache.dolphinscheduler.common.process.Property; |
||||
import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters; |
||||
import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.common.utils.NetUtils; |
||||
import org.apache.dolphinscheduler.common.utils.StringUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig; |
||||
import org.apache.dolphinscheduler.server.utils.LogUtils; |
||||
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
|
||||
import java.util.Date; |
||||
import java.util.HashMap; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.regex.Matcher; |
||||
import java.util.regex.Pattern; |
||||
import java.util.stream.Collectors; |
||||
|
||||
public class SwitchTaskProcessor extends BaseTaskProcessor { |
||||
|
||||
protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*"; |
||||
|
||||
private TaskInstance taskInstance; |
||||
|
||||
private ProcessInstance processInstance; |
||||
TaskDefinition taskDefinition; |
||||
|
||||
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); |
||||
MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class); |
||||
|
||||
/** |
||||
* switch result |
||||
*/ |
||||
private DependResult conditionResult; |
||||
|
||||
@Override |
||||
public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { |
||||
|
||||
this.processInstance = processInstance; |
||||
this.taskInstance = processService.submitTask(taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval); |
||||
|
||||
if (this.taskInstance == null) { |
||||
return false; |
||||
} |
||||
taskDefinition = processService.findTaskDefinition( |
||||
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion() |
||||
); |
||||
taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(), |
||||
processInstance.getProcessDefinitionVersion(), |
||||
taskInstance.getProcessInstanceId(), |
||||
taskInstance.getId())); |
||||
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); |
||||
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); |
||||
taskInstance.setStartTime(new Date()); |
||||
processService.updateTaskInstance(taskInstance); |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public void run() { |
||||
try { |
||||
if (!this.taskState().typeIsFinished() && setSwitchResult()) { |
||||
endTaskState(); |
||||
} |
||||
} catch (Exception e) { |
||||
logger.error("update work flow {} switch task {} state error:", |
||||
this.processInstance.getId(), |
||||
this.taskInstance.getId(), |
||||
e); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
protected boolean pauseTask() { |
||||
this.taskInstance.setState(ExecutionStatus.PAUSE); |
||||
this.taskInstance.setEndTime(new Date()); |
||||
processService.saveTaskInstance(taskInstance); |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
protected boolean killTask() { |
||||
this.taskInstance.setState(ExecutionStatus.KILL); |
||||
this.taskInstance.setEndTime(new Date()); |
||||
processService.saveTaskInstance(taskInstance); |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
protected boolean taskTimeout() { |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public String getType() { |
||||
return TaskType.SWITCH.getDesc(); |
||||
} |
||||
|
||||
@Override |
||||
public ExecutionStatus taskState() { |
||||
return this.taskInstance.getState(); |
||||
} |
||||
|
||||
private boolean setSwitchResult() { |
||||
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId( |
||||
taskInstance.getProcessInstanceId() |
||||
); |
||||
Map<String, ExecutionStatus> completeTaskList = new HashMap<>(); |
||||
for (TaskInstance task : taskInstances) { |
||||
completeTaskList.putIfAbsent(task.getName(), task.getState()); |
||||
} |
||||
SwitchParameters switchParameters = taskInstance.getSwitchDependency(); |
||||
List<SwitchResultVo> switchResultVos = switchParameters.getDependTaskList(); |
||||
SwitchResultVo switchResultVo = new SwitchResultVo(); |
||||
switchResultVo.setNextNode(switchParameters.getNextNode()); |
||||
switchResultVos.add(switchResultVo); |
||||
int finalConditionLocation = switchResultVos.size() - 1; |
||||
int i = 0; |
||||
conditionResult = DependResult.SUCCESS; |
||||
for (SwitchResultVo info : switchResultVos) { |
||||
logger.info("the {} execution ", (i + 1)); |
||||
logger.info("original condition sentence:{}", info.getCondition()); |
||||
if (StringUtils.isEmpty(info.getCondition())) { |
||||
finalConditionLocation = i; |
||||
break; |
||||
} |
||||
String content = setTaskParams(info.getCondition().replaceAll("'", "\""), rgex); |
||||
logger.info("format condition sentence::{}", content); |
||||
Boolean result = null; |
||||
try { |
||||
result = SwitchTaskUtils.evaluate(content); |
||||
} catch (Exception e) { |
||||
logger.info("error sentence : {}", content); |
||||
conditionResult = DependResult.FAILED; |
||||
break; |
||||
} |
||||
logger.info("condition result : {}", result); |
||||
if (result) { |
||||
finalConditionLocation = i; |
||||
break; |
||||
} |
||||
i++; |
||||
} |
||||
switchParameters.setDependTaskList(switchResultVos); |
||||
switchParameters.setResultConditionLocation(finalConditionLocation); |
||||
taskInstance.setSwitchDependency(switchParameters); |
||||
|
||||
logger.info("the switch task depend result : {}", conditionResult); |
||||
return true; |
||||
} |
||||
|
||||
/** |
||||
* update task state |
||||
*/ |
||||
private void endTaskState() { |
||||
ExecutionStatus status = (conditionResult == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE; |
||||
taskInstance.setEndTime(new Date()); |
||||
taskInstance.setState(status); |
||||
processService.updateTaskInstance(taskInstance); |
||||
} |
||||
|
||||
public String setTaskParams(String content, String rgex) { |
||||
Pattern pattern = Pattern.compile(rgex); |
||||
Matcher m = pattern.matcher(content); |
||||
Map<String, Property> globalParams = JSONUtils |
||||
.toList(processInstance.getGlobalParams(), Property.class) |
||||
.stream() |
||||
.collect(Collectors.toMap(Property::getProp, Property -> Property)); |
||||
Map<String, Property> varParams = JSONUtils |
||||
.toList(taskInstance.getVarPool(), Property.class) |
||||
.stream() |
||||
.collect(Collectors.toMap(Property::getProp, Property -> Property)); |
||||
if (varParams.size() > 0) { |
||||
varParams.putAll(globalParams); |
||||
globalParams = varParams; |
||||
} |
||||
while (m.find()) { |
||||
String paramName = m.group(1); |
||||
Property property = globalParams.get(paramName); |
||||
if (property == null) { |
||||
return ""; |
||||
} |
||||
String value = property.getValue(); |
||||
if (!org.apache.commons.lang.math.NumberUtils.isNumber(value)) { |
||||
value = "\"" + value + "\""; |
||||
} |
||||
logger.info("paramName:{},paramValue{}", paramName, value); |
||||
content = content.replace("${" + paramName + "}", value); |
||||
} |
||||
return content; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,27 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
/** |
||||
* task action |
||||
*/ |
||||
public enum TaskAction { |
||||
PAUSE, |
||||
STOP, |
||||
TIMEOUT |
||||
} |
@ -0,0 +1,53 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
|
||||
import java.util.Map; |
||||
import java.util.ServiceLoader; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
|
||||
import com.google.common.base.Strings; |
||||
|
||||
/** |
||||
* the factory to create task processor |
||||
*/ |
||||
public class TaskProcessorFactory { |
||||
|
||||
public static final Map<String, ITaskProcessFactory> PROCESS_FACTORY_MAP = new ConcurrentHashMap<>(); |
||||
|
||||
private static final String DEFAULT_PROCESSOR = Constants.COMMON_TASK_TYPE; |
||||
|
||||
static { |
||||
for (ITaskProcessFactory iTaskProcessor : ServiceLoader.load(ITaskProcessFactory.class)) { |
||||
PROCESS_FACTORY_MAP.put(iTaskProcessor.type(), iTaskProcessor); |
||||
} |
||||
} |
||||
|
||||
public static ITaskProcessor getTaskProcessor(String type) { |
||||
if (Strings.isNullOrEmpty(type)) { |
||||
return PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR).create(); |
||||
} |
||||
if (!PROCESS_FACTORY_MAP.containsKey(type)) { |
||||
return PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR).create(); |
||||
} |
||||
return PROCESS_FACTORY_MAP.get(type).create(); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,38 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.utils; |
||||
|
||||
import javax.script.ScriptEngine; |
||||
import javax.script.ScriptEngineManager; |
||||
import javax.script.ScriptException; |
||||
|
||||
public class SwitchTaskUtils { |
||||
private static ScriptEngineManager manager; |
||||
private static ScriptEngine engine; |
||||
|
||||
static { |
||||
manager = new ScriptEngineManager(); |
||||
engine = manager.getEngineByName("js"); |
||||
} |
||||
|
||||
public static boolean evaluate(String expression) throws ScriptException { |
||||
Object result = engine.eval(expression); |
||||
return (Boolean) result; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,59 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.worker.processor; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.common.utils.Preconditions; |
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.command.HostUpdateCommand; |
||||
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel; |
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import io.netty.channel.Channel; |
||||
|
||||
/** |
||||
* update process host |
||||
* this used when master failover |
||||
*/ |
||||
public class HostUpdateProcessor implements NettyRequestProcessor { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(HostUpdateProcessor.class); |
||||
|
||||
/** |
||||
* task callback service |
||||
*/ |
||||
private final TaskCallbackService taskCallbackService; |
||||
|
||||
public HostUpdateProcessor() { |
||||
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); |
||||
} |
||||
|
||||
@Override |
||||
public void process(Channel channel, Command command) { |
||||
Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUST == command.getType(), String.format("invalid command type : %s", command.getType())); |
||||
HostUpdateCommand updateCommand = JSONUtils.parseObject(command.getBody(), HostUpdateCommand.class); |
||||
logger.info("received host update command : {}", updateCommand); |
||||
taskCallbackService.changeRemoteChannel(updateCommand.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); |
||||
|
||||
} |
||||
} |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue