Browse Source

Merge remote-tracking branch 'upstream/dev' into spilit

pull/3/MERGE
lenboo 3 years ago
parent
commit
461e6334ae
  1. 20
      .github/PULL_REQUEST_TEMPLATE.md
  2. 14
      README.md
  3. 4
      ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-worker.xml
  4. 2
      docker/build/README.md
  5. 2
      docker/build/README_zh_CN.md
  6. 4
      docker/build/conf/dolphinscheduler/worker.properties.tpl
  7. 2
      docker/build/startup-init-conf.sh
  8. 2
      docker/docker-swarm/docker-compose.yml
  9. 2
      docker/docker-swarm/docker-stack.yml
  10. 2
      docker/kubernetes/dolphinscheduler/README.md
  11. 2
      docker/kubernetes/dolphinscheduler/templates/configmap-dolphinscheduler-worker.yaml
  12. 2
      docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml
  13. 2
      docker/kubernetes/dolphinscheduler/values.yaml
  14. 7
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
  15. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  16. 33
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  17. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
  18. 4
      dolphinscheduler-server/src/main/resources/worker.properties

20
.github/PULL_REQUEST_TEMPLATE.md

@ -1,19 +1,18 @@
## *Tips* <!--Thanks very much for contributing to Apache DolphinScheduler. Please review https://dolphinscheduler.apache.org/en-us/community/development/pull-request.html before opening a pull request.-->
- *Thanks very much for contributing to Apache DolphinScheduler.*
- *Please review https://dolphinscheduler.apache.org/en-us/community/development/pull-request.html before opening a pull request.*
## What is the purpose of the pull request
*(For example: This pull request adds checkstyle plugin.)* ## Purpose of the pull request
<!--(For example: This pull request adds checkstyle plugin).-->
## Brief change log ## Brief change log
*(for example:)* <!--*(for example:)*
- *Add maven-checkstyle-plugin to root pom.xml* - *Add maven-checkstyle-plugin to root pom.xml*
-->
## Verify this pull request ## Verify this pull request
*(Please pick either of the following options)* <!--*(Please pick either of the following options)*-->
This pull request is code cleanup without any test coverage. This pull request is code cleanup without any test coverage.
@ -25,8 +24,7 @@ This pull request is already covered by existing tests, such as *(please describ
This change added tests and can be verified as follows: This change added tests and can be verified as follows:
*(example:)* <!--*(example:)*
- *Added dolphinscheduler-dao tests for end-to-end.* - *Added dolphinscheduler-dao tests for end-to-end.*
- *Added CronUtilsTest to verify the change.* - *Added CronUtilsTest to verify the change.*
- *Manually verified the change by testing locally.* - *Manually verified the change by testing locally.* -->

14
README.md

@ -76,23 +76,23 @@ dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-${latest.release
## Thanks ## Thanks
DolphinScheduler is based on a lot of excellent open-source projects, such as google guava, guice, grpc, netty, ali bonecp, quartz, and many open-source projects of Apache and so on. DolphinScheduler is based on a lot of excellent open-source projects, such as Google guava, guice, grpc, netty, ali bonecp, quartz, and many open-source projects of Apache and so on.
We would like to express our deep gratitude to all the open-source projects used in Dolphin Scheduler. We hope that we are not only the beneficiaries of open-source, but also give back to the community. Besides, we hope everyone who have the same enthusiasm and passion for open source could join in and contribute to the open-source community! We would like to express our deep gratitude to all the open-source projects used in Dolphin Scheduler. We hope that we are not only the beneficiaries of open-source, but also give back to the community. Besides, we hope everyone who have the same enthusiasm and passion for open source could join in and contribute to the open-source community!
## Get Help ## Get Help
1. Submit an [[issue](https://github.com/apache/incubator-dolphinscheduler/issues/new/choose)] 1. Submit an [issue](https://github.com/apache/incubator-dolphinscheduler/issues/new/choose)
1. Subscribe to this mail list: https://dolphinscheduler.apache.org/en-us/community/development/subscribe.html, then email dev@dolphinscheduler.apache.org 1. Subscribe to this mailing list: https://dolphinscheduler.apache.org/en-us/community/development/subscribe.html, then email dev@dolphinscheduler.apache.org
## Community ## Community
You are so much welcomed to communicate with the developers and users of Dolphin Scheduler freely. There are two ways to find them: You are very welcome to communicate with the developers and users of Dolphin Scheduler. There are two ways to find them:
1. Join the slack channel by [this invitation link](https://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-mzqu52gi-rCggPkSHQ0DZYkwbTxO1Gw). 1. Join the Slack channel by [this invitation link](https://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-mzqu52gi-rCggPkSHQ0DZYkwbTxO1Gw).
2. Follow the [twitter account of Dolphin Scheduler](https://twitter.com/dolphinschedule) and get the latest news just on time. 2. Follow the [Twitter account of Dolphin Scheduler](https://twitter.com/dolphinschedule) and get the latest news on time.
## How to Contribute ## How to Contribute
The community welcomes everyone to participate in contributing, please refer to this website to find out more: [[How to contribute](https://dolphinscheduler.apache.org/en-us/community/development/contribute.html)] The community welcomes everyone to contribute, please refer to this page to find out more: [How to contribute](https://dolphinscheduler.apache.org/en-us/community/development/contribute.html).
## License ## License

4
ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-worker.xml

@ -67,12 +67,12 @@
<on-ambari-upgrade add="true"/> <on-ambari-upgrade add="true"/>
</property> </property>
<property> <property>
<name>worker.weigth</name> <name>worker.host.weigth</name>
<value>100</value> <value>100</value>
<value-attributes> <value-attributes>
<type>int</type> <type>int</type>
</value-attributes> </value-attributes>
<description>worker weight</description> <description>worker host weight</description>
<on-ambari-upgrade add="true"/> <on-ambari-upgrade add="true"/>
</property> </property>
</configuration> </configuration>

2
docker/build/README.md

@ -309,7 +309,7 @@ This environment variable sets port for `worker-server`. The default value is `1
This environment variable sets groups for `worker-server`. The default value is `default`. This environment variable sets groups for `worker-server`. The default value is `default`.
**`WORKER_WEIGHT`** **`WORKER_HOST_WEIGHT`**
This environment variable sets weight for `worker-server`. The default value is `100`. This environment variable sets weight for `worker-server`. The default value is `100`.

2
docker/build/README_zh_CN.md

@ -309,7 +309,7 @@ DolphinScheduler Docker 容器通过环境变量进行配置,缺省时将会
配置`worker-server`的分组,默认值 `default` 配置`worker-server`的分组,默认值 `default`
**`WORKER_WEIGHT`** **`WORKER_HOST_WEIGHT`**
配置`worker-server`的权重,默认之`100`。 配置`worker-server`的权重,默认之`100`。

4
docker/build/conf/dolphinscheduler/worker.properties.tpl

@ -33,8 +33,8 @@ worker.listen.port=${WORKER_LISTEN_PORT}
# default worker groups # default worker groups
worker.groups=${WORKER_GROUPS} worker.groups=${WORKER_GROUPS}
# default worker weight # default worker host weight
worker.weight=${WORKER_WEIGHT} worker.host.weight=${WORKER_HOST_WEIGHT}
# alert server listener host # alert server listener host
alert.listen.host=${ALERT_LISTEN_HOST} alert.listen.host=${ALERT_LISTEN_HOST}

2
docker/build/startup-init-conf.sh

@ -84,7 +84,7 @@ export WORKER_MAX_CPULOAD_AVG=${WORKER_MAX_CPULOAD_AVG:-"100"}
export WORKER_RESERVED_MEMORY=${WORKER_RESERVED_MEMORY:-"0.1"} export WORKER_RESERVED_MEMORY=${WORKER_RESERVED_MEMORY:-"0.1"}
export WORKER_LISTEN_PORT=${WORKER_LISTEN_PORT:-"1234"} export WORKER_LISTEN_PORT=${WORKER_LISTEN_PORT:-"1234"}
export WORKER_GROUPS=${WORKER_GROUPS:-"default"} export WORKER_GROUPS=${WORKER_GROUPS:-"default"}
export WORKER_WEIGHT=${WORKER_WEIGHT:-"100"} export WORKER_HOST_WEIGHT=${WORKER_HOST_WEIGHT:-"100"}
export ALERT_LISTEN_HOST=${ALERT_LISTEN_HOST:-"127.0.0.1"} export ALERT_LISTEN_HOST=${ALERT_LISTEN_HOST:-"127.0.0.1"}
#============================================================================ #============================================================================

2
docker/docker-swarm/docker-compose.yml

@ -171,7 +171,7 @@ services:
WORKER_MAX_CPULOAD_AVG: "100" WORKER_MAX_CPULOAD_AVG: "100"
WORKER_RESERVED_MEMORY: "0.1" WORKER_RESERVED_MEMORY: "0.1"
WORKER_GROUPS: "default" WORKER_GROUPS: "default"
WORKER_WEIGHT: "100" WORKER_HOST_WEIGHT: "100"
ALERT_LISTEN_HOST: dolphinscheduler-alert ALERT_LISTEN_HOST: dolphinscheduler-alert
HADOOP_HOME: "/opt/soft/hadoop" HADOOP_HOME: "/opt/soft/hadoop"
HADOOP_CONF_DIR: "/opt/soft/hadoop/etc/hadoop" HADOOP_CONF_DIR: "/opt/soft/hadoop/etc/hadoop"

2
docker/docker-swarm/docker-stack.yml

@ -165,7 +165,7 @@ services:
WORKER_MAX_CPULOAD_AVG: "100" WORKER_MAX_CPULOAD_AVG: "100"
WORKER_RESERVED_MEMORY: "0.1" WORKER_RESERVED_MEMORY: "0.1"
WORKER_GROUPS: "default" WORKER_GROUPS: "default"
WORKER_WEIGHT: "100" WORKER_HOST_WEIGHT: "100"
ALERT_LISTEN_HOST: dolphinscheduler-alert ALERT_LISTEN_HOST: dolphinscheduler-alert
HADOOP_HOME: "/opt/soft/hadoop" HADOOP_HOME: "/opt/soft/hadoop"
HADOOP_CONF_DIR: "/opt/soft/hadoop/etc/hadoop" HADOOP_CONF_DIR: "/opt/soft/hadoop/etc/hadoop"

2
docker/kubernetes/dolphinscheduler/README.md

@ -173,7 +173,7 @@ The Configuration file is `values.yaml`, and the following tables lists the conf
| `worker.configmap.WORKER_RESERVED_MEMORY` | Only larger than reserved memory, worker server can work. default value : physical memory * 1/10, unit is G | `0.1` | | `worker.configmap.WORKER_RESERVED_MEMORY` | Only larger than reserved memory, worker server can work. default value : physical memory * 1/10, unit is G | `0.1` |
| `worker.configmap.WORKER_LISTEN_PORT` | Worker listen port | `1234` | | `worker.configmap.WORKER_LISTEN_PORT` | Worker listen port | `1234` |
| `worker.configmap.WORKER_GROUPS` | Worker groups | `default` | | `worker.configmap.WORKER_GROUPS` | Worker groups | `default` |
| `worker.configmap.WORKER_WEIGHT` | Worker weight | `100` | | `worker.configmap.WORKER_HOST_WEIGHT` | Worker host weight | `100` |
| `worker.livenessProbe.enabled` | Turn on and off liveness probe | `true` | | `worker.livenessProbe.enabled` | Turn on and off liveness probe | `true` |
| `worker.livenessProbe.initialDelaySeconds` | Delay before liveness probe is initiated | `30` | | `worker.livenessProbe.initialDelaySeconds` | Delay before liveness probe is initiated | `30` |
| `worker.livenessProbe.periodSeconds` | How often to perform the probe | `30` | | `worker.livenessProbe.periodSeconds` | How often to perform the probe | `30` |

2
docker/kubernetes/dolphinscheduler/templates/configmap-dolphinscheduler-worker.yaml

@ -31,5 +31,5 @@ data:
WORKER_RESERVED_MEMORY: {{ .Values.worker.configmap.WORKER_RESERVED_MEMORY | quote }} WORKER_RESERVED_MEMORY: {{ .Values.worker.configmap.WORKER_RESERVED_MEMORY | quote }}
WORKER_LISTEN_PORT: {{ .Values.worker.configmap.WORKER_LISTEN_PORT | quote }} WORKER_LISTEN_PORT: {{ .Values.worker.configmap.WORKER_LISTEN_PORT | quote }}
WORKER_GROUPS: {{ .Values.worker.configmap.WORKER_GROUPS | quote }} WORKER_GROUPS: {{ .Values.worker.configmap.WORKER_GROUPS | quote }}
WORKER_WEIGHT: {{ .Values.worker.configmap.WORKER_WEIGHT | quote }} WORKER_HOST_WEIGHT: {{ .Values.worker.configmap.WORKER_HOST_WEIGHT | quote }}
{{- end }} {{- end }}

2
docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml

@ -111,7 +111,7 @@ spec:
valueFrom: valueFrom:
configMapKeyRef: configMapKeyRef:
name: {{ include "dolphinscheduler.fullname" . }}-worker name: {{ include "dolphinscheduler.fullname" . }}-worker
key: WORKER_WEIGHT key: WORKER_HOST_WEIGHT
- name: DOLPHINSCHEDULER_DATA_BASEDIR_PATH - name: DOLPHINSCHEDULER_DATA_BASEDIR_PATH
valueFrom: valueFrom:
configMapKeyRef: configMapKeyRef:

2
docker/kubernetes/dolphinscheduler/values.yaml

@ -203,7 +203,7 @@ worker:
WORKER_RESERVED_MEMORY: "0.1" WORKER_RESERVED_MEMORY: "0.1"
WORKER_LISTEN_PORT: "1234" WORKER_LISTEN_PORT: "1234"
WORKER_GROUPS: "default" WORKER_GROUPS: "default"
WORKER_WEIGHT: "100" WORKER_HOST_WEIGHT: "100"
## Periodic probe of container liveness. Container will be restarted if the probe fails. Cannot be updated. ## Periodic probe of container liveness. Container will be restarted if the probe fails. Cannot be updated.
## More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes ## More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes
livenessProbe: livenessProbe:

7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java

@ -60,7 +60,7 @@ public class ProcessUtils {
/** /**
* Expression of PID recognition in Windows scene * Expression of PID recognition in Windows scene
*/ */
private static final Pattern WINDOWSATTERN = Pattern.compile("(\\d+)"); private static final Pattern WINDOWSATTERN = Pattern.compile("\\w+\\((\\d+)\\)");
private static final String LOCAL_PROCESS_EXEC = "jdk.lang.Process.allowAmbiguousCommands"; private static final String LOCAL_PROCESS_EXEC = "jdk.lang.Process.allowAmbiguousCommands";
@ -391,12 +391,11 @@ public class ProcessUtils {
OSUtils.exeCmd(cmd); OSUtils.exeCmd(cmd);
// find log and kill yarn job
killYarnJob(taskExecutionContext);
} catch (Exception e) { } catch (Exception e) {
logger.error("kill task failed", e); logger.error("kill task failed", e);
} }
// find log and kill yarn job
killYarnJob(taskExecutionContext);
} }
/** /**

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -50,8 +50,8 @@ public class WorkerConfig {
@Value("${worker.listen.port: 1234}") @Value("${worker.listen.port: 1234}")
private int listenPort; private int listenPort;
@Value("${worker.weight:100}") @Value("${worker.host.weight:100}")
private int weight; private int hostWeight;
@Value("${alert.listen.host:localhost}") @Value("${alert.listen.host:localhost}")
private String alertListenHost; private String alertListenHost;
@ -115,12 +115,12 @@ public class WorkerConfig {
this.workerMaxCpuloadAvg = workerMaxCpuloadAvg; this.workerMaxCpuloadAvg = workerMaxCpuloadAvg;
} }
public int getWeight() { public int getHostWeight() {
return weight; return hostWeight;
} }
public void setWeight(int weight) { public void setHostWeight(int weight) {
this.weight = weight; this.hostWeight = weight;
} }
public String getAlertListenHost() { public String getAlertListenHost() {

33
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -111,10 +111,11 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* @return kill result * @return kill result
*/ */
private Pair<Boolean, List<String>> doKill(TaskKillRequestCommand killCommand) { private Pair<Boolean, List<String>> doKill(TaskKillRequestCommand killCommand) {
boolean processFlag = true;
List<String> appIds = Collections.emptyList(); List<String> appIds = Collections.emptyList();
int taskInstanceId = killCommand.getTaskInstanceId();
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
try { try {
int taskInstanceId = killCommand.getTaskInstanceId();
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
Integer processId = taskExecutionContext.getProcessId(); Integer processId = taskExecutionContext.getProcessId();
if (processId.equals(0)) { if (processId.equals(0)) {
workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId); workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
@ -128,18 +129,16 @@ public class TaskKillProcessor implements NettyRequestProcessor {
logger.info("process id:{}, cmd:{}", taskExecutionContext.getProcessId(), cmd); logger.info("process id:{}, cmd:{}", taskExecutionContext.getProcessId(), cmd);
OSUtils.exeCmd(cmd); OSUtils.exeCmd(cmd);
// find log and kill yarn job
appIds = killYarnJob(Host.of(taskExecutionContext.getHost()).getIp(),
taskExecutionContext.getLogPath(),
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTenantCode());
return Pair.of(true, appIds);
} catch (Exception e) { } catch (Exception e) {
processFlag = false;
logger.error("kill task error", e); logger.error("kill task error", e);
} }
return Pair.of(false, appIds); // find log and kill yarn job
Pair<Boolean, List<String>> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()).getIp(),
taskExecutionContext.getLogPath(),
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTenantCode());
return Pair.of(processFlag && yarnResult.getLeft(), yarnResult.getRight());
} }
/** /**
@ -170,26 +169,26 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* @param logPath logPath * @param logPath logPath
* @param executePath executePath * @param executePath executePath
* @param tenantCode tenantCode * @param tenantCode tenantCode
* @return List<String> appIds * @return Pair<Boolean, List<String>> yarn kill result
*/ */
private List<String> killYarnJob(String host, String logPath, String executePath, String tenantCode) { private Pair<Boolean, List<String>> killYarnJob(String host, String logPath, String executePath, String tenantCode) {
LogClientService logClient = null; LogClientService logClient = null;
try { try {
logClient = new LogClientService(); logClient = new LogClientService();
logger.info("view log host : {},logPath : {}", host, logPath); logger.info("view log host : {},logPath : {}", host, logPath);
String log = logClient.viewLog(host, Constants.RPC_PORT, logPath); String log = logClient.viewLog(host, Constants.RPC_PORT, logPath);
List<String> appIds = Collections.emptyList();
if (StringUtils.isNotEmpty(log)) { if (StringUtils.isNotEmpty(log)) {
List<String> appIds = LoggerUtils.getAppIds(log, logger); appIds = LoggerUtils.getAppIds(log, logger);
if (StringUtils.isEmpty(executePath)) { if (StringUtils.isEmpty(executePath)) {
logger.error("task instance execute path is empty"); logger.error("task instance execute path is empty");
throw new RuntimeException("task instance execute path is empty"); throw new RuntimeException("task instance execute path is empty");
} }
if (appIds.size() > 0) { if (appIds.size() > 0) {
ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath); ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath);
return appIds;
} }
} }
return Pair.of(true, appIds);
} catch (Exception e) { } catch (Exception e) {
logger.error("kill yarn job error", e); logger.error("kill yarn job error", e);
} finally { } finally {
@ -197,7 +196,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
logClient.close(); logClient.close();
} }
} }
return Collections.EMPTY_LIST; return Pair.of(false, Collections.emptyList());
} }
} }

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -152,7 +152,7 @@ public class WorkerRegistry {
String address = getLocalAddress(); String address = getLocalAddress();
String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath(); String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath();
int weight = workerConfig.getWeight(); int weight = workerConfig.getHostWeight();
long workerStartTime = System.currentTimeMillis(); long workerStartTime = System.currentTimeMillis();
for (String workGroup : this.workerGroups) { for (String workGroup : this.workerGroups) {

4
dolphinscheduler-server/src/main/resources/worker.properties

@ -33,8 +33,8 @@
# default worker groups # default worker groups
#worker.groups=default #worker.groups=default
# default worker weight # default worker host weight
#worker.weight=100 #worker.host.weight=100
# alert server listener host # alert server listener host
alert.listen.host=localhost alert.listen.host=localhost

Loading…
Cancel
Save