Browse Source

Merge remote-tracking branch 'upstream/dev' into fixFormatBug

merge upstream dev into local fixFormatBug branch
pull/2/head
loushang 5 years ago
parent
commit
9a7f262dee
  1. 11
      .github/workflows/ci_ut.yml
  2. 0
      docker/README.md
  3. 24
      docker/docker-compose.yml
  4. 771
      docker/postgres/docker-entrypoint-initdb/init.sql
  5. 7
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java
  6. 2
      dolphinscheduler-alert/src/main/resources/mail_templates/alert_mail_template.ftl
  7. 6
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  8. 130
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java
  9. 6
      dolphinscheduler-dao/src/main/resources/application-dao.properties
  10. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java
  11. 126
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/AbstractMonitor.java
  12. 28
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/Monitor.java
  13. 63
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/MonitorServer.java
  14. 85
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RunConfig.java
  15. 62
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/ZKMonitorImpl.java
  16. 75
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
  17. 59
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/RemoveZKNode.java
  18. 39
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtil.java
  19. 92
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverter.java
  20. 0
      dolphinscheduler-server/src/main/resources/config/install_config.conf
  21. 0
      dolphinscheduler-server/src/main/resources/config/run_config.conf
  22. 4
      dolphinscheduler-server/src/main/resources/worker_logback.xml
  23. 131
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
  24. 37
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtilTest.java
  25. 92
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverterTest.java
  26. 31
      install.sh
  27. 2
      pom.xml
  28. 34
      script/del-zk-node.py
  29. 124
      script/monitor-server.py
  30. 52
      script/monitor-server.sh
  31. 48
      script/remove-zk-node.sh

11
.github/workflows/ci_ut.yml

@ -16,6 +16,9 @@
#
on: ["pull_request"]
env:
DOCKER_DIR: ./docker
LOG_DIR: /tmp/dolphinscheduler
name: Test Coveralls Parallel
@ -35,6 +38,8 @@ jobs:
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-
- name: Bootstrap database
run: cd ${DOCKER_DIR} && docker-compose up -d
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
@ -44,3 +49,9 @@ jobs:
export MAVEN_OPTS='-Dmaven.repo.local=.m2/repository -XX:+TieredCompilation -XX:TieredStopAtLevel=1 -XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -Xmx3g'
mvn test -Dmaven.test.skip=false cobertura:cobertura
CODECOV_TOKEN="09c2663f-b091-4258-8a47-c981827eb29a" bash <(curl -s https://codecov.io/bash)
- name: Collect logs
run: |
mkdir -p ${LOG_DIR}
cd ${DOCKER_DIR}
docker-compose logs db > ${LOG_DIR}/db.txt
continue-on-error: true

0
docker/README.md

24
docker/docker-compose.yml

@ -0,0 +1,24 @@
version: '2'
services:
zookeeper:
image: zookeeper
restart: always
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
db:
image: postgres
container_name: postgres
environment:
- POSTGRES_USER=test
- POSTGRES_PASSWORD=test
- POSTGRES_DB=dolphinscheduler
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
- ./postgres/docker-entrypoint-initdb:/docker-entrypoint-initdb.d
volumes:
pgdata:

771
docker/postgres/docker-entrypoint-initdb/init.sql

@ -0,0 +1,771 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;
CREATE TABLE QRTZ_JOB_DETAILS(
SCHED_NAME character varying(120) NOT NULL,
JOB_NAME character varying(200) NOT NULL,
JOB_GROUP character varying(200) NOT NULL,
DESCRIPTION character varying(250) NULL,
JOB_CLASS_NAME character varying(250) NOT NULL,
IS_DURABLE boolean NOT NULL,
IS_NONCONCURRENT boolean NOT NULL,
IS_UPDATE_DATA boolean NOT NULL,
REQUESTS_RECOVERY boolean NOT NULL,
JOB_DATA bytea NULL);
alter table QRTZ_JOB_DETAILS add primary key(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE TABLE QRTZ_TRIGGERS (
SCHED_NAME character varying(120) NOT NULL,
TRIGGER_NAME character varying(200) NOT NULL,
TRIGGER_GROUP character varying(200) NOT NULL,
JOB_NAME character varying(200) NOT NULL,
JOB_GROUP character varying(200) NOT NULL,
DESCRIPTION character varying(250) NULL,
NEXT_FIRE_TIME BIGINT NULL,
PREV_FIRE_TIME BIGINT NULL,
PRIORITY INTEGER NULL,
TRIGGER_STATE character varying(16) NOT NULL,
TRIGGER_TYPE character varying(8) NOT NULL,
START_TIME BIGINT NOT NULL,
END_TIME BIGINT NULL,
CALENDAR_NAME character varying(200) NULL,
MISFIRE_INSTR SMALLINT NULL,
JOB_DATA bytea NULL) ;
alter table QRTZ_TRIGGERS add primary key(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE TABLE QRTZ_SIMPLE_TRIGGERS (
SCHED_NAME character varying(120) NOT NULL,
TRIGGER_NAME character varying(200) NOT NULL,
TRIGGER_GROUP character varying(200) NOT NULL,
REPEAT_COUNT BIGINT NOT NULL,
REPEAT_INTERVAL BIGINT NOT NULL,
TIMES_TRIGGERED BIGINT NOT NULL) ;
alter table QRTZ_SIMPLE_TRIGGERS add primary key(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE TABLE QRTZ_CRON_TRIGGERS (
SCHED_NAME character varying(120) NOT NULL,
TRIGGER_NAME character varying(200) NOT NULL,
TRIGGER_GROUP character varying(200) NOT NULL,
CRON_EXPRESSION character varying(120) NOT NULL,
TIME_ZONE_ID character varying(80)) ;
alter table QRTZ_CRON_TRIGGERS add primary key(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE TABLE QRTZ_SIMPROP_TRIGGERS
(
SCHED_NAME character varying(120) NOT NULL,
TRIGGER_NAME character varying(200) NOT NULL,
TRIGGER_GROUP character varying(200) NOT NULL,
STR_PROP_1 character varying(512) NULL,
STR_PROP_2 character varying(512) NULL,
STR_PROP_3 character varying(512) NULL,
INT_PROP_1 INT NULL,
INT_PROP_2 INT NULL,
LONG_PROP_1 BIGINT NULL,
LONG_PROP_2 BIGINT NULL,
DEC_PROP_1 NUMERIC(13,4) NULL,
DEC_PROP_2 NUMERIC(13,4) NULL,
BOOL_PROP_1 boolean NULL,
BOOL_PROP_2 boolean NULL) ;
alter table QRTZ_SIMPROP_TRIGGERS add primary key(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE TABLE QRTZ_BLOB_TRIGGERS (
SCHED_NAME character varying(120) NOT NULL,
TRIGGER_NAME character varying(200) NOT NULL,
TRIGGER_GROUP character varying(200) NOT NULL,
BLOB_DATA bytea NULL) ;
alter table QRTZ_BLOB_TRIGGERS add primary key(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE TABLE QRTZ_CALENDARS (
SCHED_NAME character varying(120) NOT NULL,
CALENDAR_NAME character varying(200) NOT NULL,
CALENDAR bytea NOT NULL) ;
alter table QRTZ_CALENDARS add primary key(SCHED_NAME,CALENDAR_NAME);
CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS (
SCHED_NAME character varying(120) NOT NULL,
TRIGGER_GROUP character varying(200) NOT NULL) ;
alter table QRTZ_PAUSED_TRIGGER_GRPS add primary key(SCHED_NAME,TRIGGER_GROUP);
CREATE TABLE QRTZ_FIRED_TRIGGERS (
SCHED_NAME character varying(120) NOT NULL,
ENTRY_ID character varying(95) NOT NULL,
TRIGGER_NAME character varying(200) NOT NULL,
TRIGGER_GROUP character varying(200) NOT NULL,
INSTANCE_NAME character varying(200) NOT NULL,
FIRED_TIME BIGINT NOT NULL,
SCHED_TIME BIGINT NOT NULL,
PRIORITY INTEGER NOT NULL,
STATE character varying(16) NOT NULL,
JOB_NAME character varying(200) NULL,
JOB_GROUP character varying(200) NULL,
IS_NONCONCURRENT boolean NULL,
REQUESTS_RECOVERY boolean NULL) ;
alter table QRTZ_FIRED_TRIGGERS add primary key(SCHED_NAME,ENTRY_ID);
CREATE TABLE QRTZ_SCHEDULER_STATE (
SCHED_NAME character varying(120) NOT NULL,
INSTANCE_NAME character varying(200) NOT NULL,
LAST_CHECKIN_TIME BIGINT NOT NULL,
CHECKIN_INTERVAL BIGINT NOT NULL) ;
alter table QRTZ_SCHEDULER_STATE add primary key(SCHED_NAME,INSTANCE_NAME);
CREATE TABLE QRTZ_LOCKS (
SCHED_NAME character varying(120) NOT NULL,
LOCK_NAME character varying(40) NOT NULL) ;
alter table QRTZ_LOCKS add primary key(SCHED_NAME,LOCK_NAME);
CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME);
CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME);
CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
--
-- Table structure for table t_ds_access_token
--
DROP TABLE IF EXISTS t_ds_access_token;
CREATE TABLE t_ds_access_token (
id int NOT NULL ,
user_id int DEFAULT NULL ,
token varchar(64) DEFAULT NULL ,
expire_time timestamp DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_alert
--
DROP TABLE IF EXISTS t_ds_alert;
CREATE TABLE t_ds_alert (
id int NOT NULL ,
title varchar(64) DEFAULT NULL ,
show_type int DEFAULT NULL ,
content text ,
alert_type int DEFAULT NULL ,
alert_status int DEFAULT '0' ,
log text ,
alertgroup_id int DEFAULT NULL ,
receivers text ,
receivers_cc text ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_alertgroup
--
DROP TABLE IF EXISTS t_ds_alertgroup;
CREATE TABLE t_ds_alertgroup (
id int NOT NULL ,
group_name varchar(255) DEFAULT NULL ,
group_type int DEFAULT NULL ,
description varchar(255) DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_command
--
DROP TABLE IF EXISTS t_ds_command;
CREATE TABLE t_ds_command (
id int NOT NULL ,
command_type int DEFAULT NULL ,
process_definition_id int DEFAULT NULL ,
command_param text ,
task_depend_type int DEFAULT NULL ,
failure_strategy int DEFAULT '0' ,
warning_type int DEFAULT '0' ,
warning_group_id int DEFAULT NULL ,
schedule_time timestamp DEFAULT NULL ,
start_time timestamp DEFAULT NULL ,
executor_id int DEFAULT NULL ,
dependence varchar(255) DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
process_instance_priority int DEFAULT NULL ,
worker_group_id int DEFAULT '-1' ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_datasource
--
DROP TABLE IF EXISTS t_ds_datasource;
CREATE TABLE t_ds_datasource (
id int NOT NULL ,
name varchar(64) NOT NULL ,
note varchar(256) DEFAULT NULL ,
type int NOT NULL ,
user_id int NOT NULL ,
connection_params text NOT NULL ,
create_time timestamp NOT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_error_command
--
DROP TABLE IF EXISTS t_ds_error_command;
CREATE TABLE t_ds_error_command (
id int NOT NULL ,
command_type int DEFAULT NULL ,
executor_id int DEFAULT NULL ,
process_definition_id int DEFAULT NULL ,
command_param text ,
task_depend_type int DEFAULT NULL ,
failure_strategy int DEFAULT '0' ,
warning_type int DEFAULT '0' ,
warning_group_id int DEFAULT NULL ,
schedule_time timestamp DEFAULT NULL ,
start_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
dependence text ,
process_instance_priority int DEFAULT NULL ,
worker_group_id int DEFAULT '-1' ,
message text ,
PRIMARY KEY (id)
);
--
-- Table structure for table t_ds_master_server
--
DROP TABLE IF EXISTS t_ds_master_server;
CREATE TABLE t_ds_master_server (
id int NOT NULL ,
host varchar(45) DEFAULT NULL ,
port int DEFAULT NULL ,
zk_directory varchar(64) DEFAULT NULL ,
res_info varchar(256) DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
last_heartbeat_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_process_definition
--
DROP TABLE IF EXISTS t_ds_process_definition;
CREATE TABLE t_ds_process_definition (
id int NOT NULL ,
name varchar(255) DEFAULT NULL ,
version int DEFAULT NULL ,
release_state int DEFAULT NULL ,
project_id int DEFAULT NULL ,
user_id int DEFAULT NULL ,
process_definition_json text ,
description text ,
global_params text ,
flag int DEFAULT NULL ,
locations text ,
connects text ,
receivers text ,
receivers_cc text ,
create_time timestamp DEFAULT NULL ,
timeout int DEFAULT '0' ,
tenant_id int NOT NULL DEFAULT '-1' ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
create index process_definition_index on t_ds_process_definition (project_id,id);
--
-- Table structure for table t_ds_process_instance
--
DROP TABLE IF EXISTS t_ds_process_instance;
CREATE TABLE t_ds_process_instance (
id int NOT NULL ,
name varchar(255) DEFAULT NULL ,
process_definition_id int DEFAULT NULL ,
state int DEFAULT NULL ,
recovery int DEFAULT NULL ,
start_time timestamp DEFAULT NULL ,
end_time timestamp DEFAULT NULL ,
run_times int DEFAULT NULL ,
host varchar(45) DEFAULT NULL ,
command_type int DEFAULT NULL ,
command_param text ,
task_depend_type int DEFAULT NULL ,
max_try_times int DEFAULT '0' ,
failure_strategy int DEFAULT '0' ,
warning_type int DEFAULT '0' ,
warning_group_id int DEFAULT NULL ,
schedule_time timestamp DEFAULT NULL ,
command_start_time timestamp DEFAULT NULL ,
global_params text ,
process_instance_json text ,
flag int DEFAULT '1' ,
update_time timestamp NULL ,
is_sub_process int DEFAULT '0' ,
executor_id int NOT NULL ,
locations text ,
connects text ,
history_cmd text ,
dependence_schedule_times text ,
process_instance_priority int DEFAULT NULL ,
worker_group_id int DEFAULT '-1' ,
timeout int DEFAULT '0' ,
tenant_id int NOT NULL DEFAULT '-1' ,
PRIMARY KEY (id)
) ;
create index process_instance_index on t_ds_process_instance (process_definition_id,id);
create index start_time_index on t_ds_process_instance (start_time);
--
-- Table structure for table t_ds_project
--
DROP TABLE IF EXISTS t_ds_project;
CREATE TABLE t_ds_project (
id int NOT NULL ,
name varchar(100) DEFAULT NULL ,
description varchar(200) DEFAULT NULL ,
user_id int DEFAULT NULL ,
flag int DEFAULT '1' ,
create_time timestamp DEFAULT CURRENT_TIMESTAMP ,
update_time timestamp DEFAULT CURRENT_TIMESTAMP ,
PRIMARY KEY (id)
) ;
create index user_id_index on t_ds_project (user_id);
--
-- Table structure for table t_ds_queue
--
DROP TABLE IF EXISTS t_ds_queue;
CREATE TABLE t_ds_queue (
id int NOT NULL ,
queue_name varchar(64) DEFAULT NULL ,
queue varchar(64) DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
);
--
-- Table structure for table t_ds_relation_datasource_user
--
DROP TABLE IF EXISTS t_ds_relation_datasource_user;
CREATE TABLE t_ds_relation_datasource_user (
id int NOT NULL ,
user_id int NOT NULL ,
datasource_id int DEFAULT NULL ,
perm int DEFAULT '1' ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
;
--
-- Table structure for table t_ds_relation_process_instance
--
DROP TABLE IF EXISTS t_ds_relation_process_instance;
CREATE TABLE t_ds_relation_process_instance (
id int NOT NULL ,
parent_process_instance_id int DEFAULT NULL ,
parent_task_instance_id int DEFAULT NULL ,
process_instance_id int DEFAULT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_relation_project_user
--
DROP TABLE IF EXISTS t_ds_relation_project_user;
CREATE TABLE t_ds_relation_project_user (
id int NOT NULL ,
user_id int NOT NULL ,
project_id int DEFAULT NULL ,
perm int DEFAULT '1' ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
create index relation_project_user_id_index on t_ds_relation_project_user (user_id);
--
-- Table structure for table t_ds_relation_resources_user
--
DROP TABLE IF EXISTS t_ds_relation_resources_user;
CREATE TABLE t_ds_relation_resources_user (
id int NOT NULL ,
user_id int NOT NULL ,
resources_id int DEFAULT NULL ,
perm int DEFAULT '1' ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_relation_udfs_user
--
DROP TABLE IF EXISTS t_ds_relation_udfs_user;
CREATE TABLE t_ds_relation_udfs_user (
id int NOT NULL ,
user_id int NOT NULL ,
udf_id int DEFAULT NULL ,
perm int DEFAULT '1' ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
;
--
-- Table structure for table t_ds_relation_user_alertgroup
--
DROP TABLE IF EXISTS t_ds_relation_user_alertgroup;
CREATE TABLE t_ds_relation_user_alertgroup (
id int NOT NULL,
alertgroup_id int DEFAULT NULL,
user_id int DEFAULT NULL,
create_time timestamp DEFAULT NULL,
update_time timestamp DEFAULT NULL,
PRIMARY KEY (id)
);
--
-- Table structure for table t_ds_resources
--
DROP TABLE IF EXISTS t_ds_resources;
CREATE TABLE t_ds_resources (
id int NOT NULL ,
alias varchar(64) DEFAULT NULL ,
file_name varchar(64) DEFAULT NULL ,
description varchar(256) DEFAULT NULL ,
user_id int DEFAULT NULL ,
type int DEFAULT NULL ,
size bigint DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
;
--
-- Table structure for table t_ds_schedules
--
DROP TABLE IF EXISTS t_ds_schedules;
CREATE TABLE t_ds_schedules (
id int NOT NULL ,
process_definition_id int NOT NULL ,
start_time timestamp NOT NULL ,
end_time timestamp NOT NULL ,
crontab varchar(256) NOT NULL ,
failure_strategy int NOT NULL ,
user_id int NOT NULL ,
release_state int NOT NULL ,
warning_type int NOT NULL ,
warning_group_id int DEFAULT NULL ,
process_instance_priority int DEFAULT NULL ,
worker_group_id int DEFAULT '-1' ,
create_time timestamp NOT NULL ,
update_time timestamp NOT NULL ,
PRIMARY KEY (id)
);
--
-- Table structure for table t_ds_session
--
DROP TABLE IF EXISTS t_ds_session;
CREATE TABLE t_ds_session (
id varchar(64) NOT NULL ,
user_id int DEFAULT NULL ,
ip varchar(45) DEFAULT NULL ,
last_login_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
);
--
-- Table structure for table t_ds_task_instance
--
DROP TABLE IF EXISTS t_ds_task_instance;
CREATE TABLE t_ds_task_instance (
id int NOT NULL ,
name varchar(255) DEFAULT NULL ,
task_type varchar(64) DEFAULT NULL ,
process_definition_id int DEFAULT NULL ,
process_instance_id int DEFAULT NULL ,
task_json text ,
state int DEFAULT NULL ,
submit_time timestamp DEFAULT NULL ,
start_time timestamp DEFAULT NULL ,
end_time timestamp DEFAULT NULL ,
host varchar(45) DEFAULT NULL ,
execute_path varchar(200) DEFAULT NULL ,
log_path varchar(200) DEFAULT NULL ,
alert_flag int DEFAULT NULL ,
retry_times int DEFAULT '0' ,
pid int DEFAULT NULL ,
app_link varchar(255) DEFAULT NULL ,
flag int DEFAULT '1' ,
retry_interval int DEFAULT NULL ,
max_retry_times int DEFAULT NULL ,
task_instance_priority int DEFAULT NULL ,
worker_group_id int DEFAULT '-1' ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_tenant
--
DROP TABLE IF EXISTS t_ds_tenant;
CREATE TABLE t_ds_tenant (
id int NOT NULL ,
tenant_code varchar(64) DEFAULT NULL ,
tenant_name varchar(64) DEFAULT NULL ,
description varchar(256) DEFAULT NULL ,
queue_id int DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_udfs
--
DROP TABLE IF EXISTS t_ds_udfs;
CREATE TABLE t_ds_udfs (
id int NOT NULL ,
user_id int NOT NULL ,
func_name varchar(100) NOT NULL ,
class_name varchar(255) NOT NULL ,
type int NOT NULL ,
arg_types varchar(255) DEFAULT NULL ,
database varchar(255) DEFAULT NULL ,
description varchar(255) DEFAULT NULL ,
resource_id int NOT NULL ,
resource_name varchar(255) NOT NULL ,
create_time timestamp NOT NULL ,
update_time timestamp NOT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_user
--
DROP TABLE IF EXISTS t_ds_user;
CREATE TABLE t_ds_user (
id int NOT NULL ,
user_name varchar(64) DEFAULT NULL ,
user_password varchar(64) DEFAULT NULL ,
user_type int DEFAULT NULL ,
email varchar(64) DEFAULT NULL ,
phone varchar(11) DEFAULT NULL ,
tenant_id int DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
queue varchar(64) DEFAULT NULL ,
PRIMARY KEY (id)
);
--
-- Table structure for table t_ds_version
--
DROP TABLE IF EXISTS t_ds_version;
CREATE TABLE t_ds_version (
id int NOT NULL ,
version varchar(200) NOT NULL,
PRIMARY KEY (id)
) ;
create index version_index on t_ds_version(version);
--
-- Table structure for table t_ds_worker_group
--
DROP TABLE IF EXISTS t_ds_worker_group;
CREATE TABLE t_ds_worker_group (
id bigint NOT NULL ,
name varchar(256) DEFAULT NULL ,
ip_list varchar(256) DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_worker_server
--
DROP TABLE IF EXISTS t_ds_worker_server;
CREATE TABLE t_ds_worker_server (
id int NOT NULL ,
host varchar(45) DEFAULT NULL ,
port int DEFAULT NULL ,
zk_directory varchar(64) DEFAULT NULL ,
res_info varchar(255) DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
last_heartbeat_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
DROP SEQUENCE IF EXISTS t_ds_access_token_id_sequence;
CREATE SEQUENCE t_ds_access_token_id_sequence;
ALTER TABLE t_ds_access_token ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_access_token_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_alert_id_sequence;
CREATE SEQUENCE t_ds_alert_id_sequence;
ALTER TABLE t_ds_alert ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_alert_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_alertgroup_id_sequence;
CREATE SEQUENCE t_ds_alertgroup_id_sequence;
ALTER TABLE t_ds_alertgroup ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_alertgroup_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_command_id_sequence;
CREATE SEQUENCE t_ds_command_id_sequence;
ALTER TABLE t_ds_command ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_command_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_datasource_id_sequence;
CREATE SEQUENCE t_ds_datasource_id_sequence;
ALTER TABLE t_ds_datasource ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_datasource_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_master_server_id_sequence;
CREATE SEQUENCE t_ds_master_server_id_sequence;
ALTER TABLE t_ds_master_server ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_master_server_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_process_definition_id_sequence;
CREATE SEQUENCE t_ds_process_definition_id_sequence;
ALTER TABLE t_ds_process_definition ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_process_definition_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_process_instance_id_sequence;
CREATE SEQUENCE t_ds_process_instance_id_sequence;
ALTER TABLE t_ds_process_instance ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_process_instance_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_project_id_sequence;
CREATE SEQUENCE t_ds_project_id_sequence;
ALTER TABLE t_ds_project ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_project_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_queue_id_sequence;
CREATE SEQUENCE t_ds_queue_id_sequence;
ALTER TABLE t_ds_queue ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_queue_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_relation_datasource_user_id_sequence;
CREATE SEQUENCE t_ds_relation_datasource_user_id_sequence;
ALTER TABLE t_ds_relation_datasource_user ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_relation_datasource_user_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_relation_process_instance_id_sequence;
CREATE SEQUENCE t_ds_relation_process_instance_id_sequence;
ALTER TABLE t_ds_relation_process_instance ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_relation_process_instance_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_relation_project_user_id_sequence;
CREATE SEQUENCE t_ds_relation_project_user_id_sequence;
ALTER TABLE t_ds_relation_project_user ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_relation_project_user_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_relation_resources_user_id_sequence;
CREATE SEQUENCE t_ds_relation_resources_user_id_sequence;
ALTER TABLE t_ds_relation_resources_user ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_relation_resources_user_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_relation_udfs_user_id_sequence;
CREATE SEQUENCE t_ds_relation_udfs_user_id_sequence;
ALTER TABLE t_ds_relation_udfs_user ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_relation_udfs_user_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_relation_user_alertgroup_id_sequence;
CREATE SEQUENCE t_ds_relation_user_alertgroup_id_sequence;
ALTER TABLE t_ds_relation_user_alertgroup ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_relation_user_alertgroup_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_resources_id_sequence;
CREATE SEQUENCE t_ds_resources_id_sequence;
ALTER TABLE t_ds_resources ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_resources_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_schedules_id_sequence;
CREATE SEQUENCE t_ds_schedules_id_sequence;
ALTER TABLE t_ds_schedules ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_schedules_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_task_instance_id_sequence;
CREATE SEQUENCE t_ds_task_instance_id_sequence;
ALTER TABLE t_ds_task_instance ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_task_instance_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_tenant_id_sequence;
CREATE SEQUENCE t_ds_tenant_id_sequence;
ALTER TABLE t_ds_tenant ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_tenant_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_udfs_id_sequence;
CREATE SEQUENCE t_ds_udfs_id_sequence;
ALTER TABLE t_ds_udfs ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_udfs_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_user_id_sequence;
CREATE SEQUENCE t_ds_user_id_sequence;
ALTER TABLE t_ds_user ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_user_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_version_id_sequence;
CREATE SEQUENCE t_ds_version_id_sequence;
ALTER TABLE t_ds_version ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_version_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_worker_group_id_sequence;
CREATE SEQUENCE t_ds_worker_group_id_sequence;
ALTER TABLE t_ds_worker_group ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_group_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_worker_server_id_sequence;
CREATE SEQUENCE t_ds_worker_server_id_sequence;
ALTER TABLE t_ds_worker_server ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_server_id_sequence');
-- Records of t_ds_user,user : admin , password : dolphinscheduler123
INSERT INTO t_ds_user(user_name,user_password,user_type,email,phone,tenant_id,create_time,update_time) VALUES ('admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22');
-- Records of t_ds_alertgroup,dolphinscheduler warning group
INSERT INTO t_ds_alertgroup(group_name,group_type,description,create_time,update_time) VALUES ('dolphinscheduler warning group', '0', 'dolphinscheduler warning group','2018-11-29 10:20:39', '2018-11-29 10:20:39');
INSERT INTO t_ds_relation_user_alertgroup(alertgroup_id,user_id,create_time,update_time) VALUES ( '1', '1', '2018-11-29 10:22:33', '2018-11-29 10:22:33');
-- Records of t_ds_queue,default queue name : default
INSERT INTO t_ds_queue(queue_name,queue,create_time,update_time) VALUES ('default', 'default','2018-11-29 10:22:33', '2018-11-29 10:22:33');
-- Records of t_ds_queue,default queue name : default
INSERT INTO t_ds_version(version) VALUES ('1.2.0');

7
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java

@ -20,6 +20,8 @@ import org.apache.poi.hssf.usermodel.HSSFCell;
import org.apache.poi.hssf.usermodel.HSSFRow;
import org.apache.poi.hssf.usermodel.HSSFSheet;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.CellStyle;
import org.apache.poi.ss.usermodel.HorizontalAlignment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -71,10 +73,14 @@ public class ExcelUtils {
//set the height of the first line
row.setHeight((short)500);
//set Horizontal right
CellStyle cellStyle = wb.createCellStyle();
cellStyle.setAlignment(HorizontalAlignment.RIGHT);
//setting excel headers
for (int i = 0; i < headerList.size(); i++) {
HSSFCell cell = row.createCell(i);
cell.setCellStyle(cellStyle);
cell.setCellValue(headerList.get(i));
}
@ -88,6 +94,7 @@ public class ExcelUtils {
rowIndex++;
for (int j = 0 ; j < values.length ; j++){
HSSFCell cell1 = row.createCell(j);
cell1.setCellStyle(cellStyle);
cell1.setCellValue(String.valueOf(values[j]));
}
}

2
dolphinscheduler-alert/src/main/resources/mail_templates/alert_mail_template.ftl

@ -14,4 +14,4 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE HTML PUBLIC '-//W3C//DTD HTML 4.01 Transitional//EN' 'http://www.w3.org/TR/html4/loose.dtd'><html><head><title> dolphinscheduler</title><meta name='Keywords' content=''><meta name='Description' content=''><style type="text/css">table { margin-top:0px; padding-top:0px; border:1px solid; font-size: 14px; color: #333333; border-width: 1px; border-color: #666666; border-collapse: collapse; } table th { border-width: 1px; padding: 8px; border-style: solid; border-color: #666666; background-color: #dedede; } table td { border-width: 1px; padding: 8px; border-style: solid; border-color: #666666; background-color: #ffffff; }</style></head><body style="margin:0;padding:0"><table border="1px" cellpadding="5px" cellspacing="-10px"><thead><#if title??> ${title}</#if></thead><#if content??> ${content}</#if></table></body></html>
<!DOCTYPE HTML PUBLIC '-//W3C//DTD HTML 4.01 Transitional//EN' 'http://www.w3.org/TR/html4/loose.dtd'><html><head><title> dolphinscheduler</title><meta name='Keywords' content=''><meta name='Description' content=''><style type="text/css">table { margin-top:0px; padding-top:0px; border:1px solid; font-size: 14px; color: #333333; border-width: 1px; border-color: #666666; border-collapse: collapse; } table th { border-width: 1px; padding: 8px; border-style: solid; border-color: #666666; background-color: #dedede; text-align: right;} table td { border-width: 1px; padding: 8px; border-style: solid; border-color: #666666; background-color: #ffffff; text-align: right;}</style></head><body style="margin:0;padding:0"><table border="1px" cellpadding="5px" cellspacing="-10px"><thead><#if title??> ${title}</#if></thead><#if content??> ${content}</#if></table></body></html>

6
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -1005,4 +1005,10 @@ public final class Constants {
public static final String CLASS = "class";
public static final String RECEIVERS = "receivers";
public static final String RECEIVERS_CC = "receiversCc";
/**
* dataSource sensitive param
*/
public static final String DATASOURCE_PASSWORD_REGEX = "(?<=(\"password\":\")).*?(?=(\"))";
}

130
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.time.DateUtils;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.placeholder.PlaceholderUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import static org.apache.dolphinscheduler.common.Constants.PARAMETER_FORMAT_TIME;
import static org.apache.dolphinscheduler.common.utils.placeholder.TimePlaceholderUtils.replacePlaceholders;
public class ParameterUtilsTest {
public static final Logger logger = LoggerFactory.getLogger(ParameterUtilsTest.class);
/**
* Test convertParameterPlaceholders
*/
@Test
public void testConvertParameterPlaceholders() throws Exception {
// parameterString,parameterMap is null
Assert.assertNull(ParameterUtils.convertParameterPlaceholders(null, null));
// parameterString is null,parameterMap is not null
Map<String, String> parameterMap = new HashMap<String,String>();
parameterMap.put("testParameter","testParameter");
Assert.assertNull(ParameterUtils.convertParameterPlaceholders(null, parameterMap));
// parameterString、parameterMap is not null
String parameterString = "test_parameter";
Assert.assertEquals(parameterString, ParameterUtils.convertParameterPlaceholders(parameterString, parameterMap));
//replace variable ${} form
parameterMap.put("testParameter2","${testParameter}");
Assert.assertEquals(parameterString,PlaceholderUtils.replacePlaceholders(parameterString, parameterMap, true));
// replace time $[...] form, eg. $[yyyyMMdd]
Date cronTime = new Date();
Assert.assertEquals(parameterString, replacePlaceholders(parameterString, cronTime, true));
// replace time $[...] form, eg. $[yyyyMMdd]
Date cronTimeStr = DateUtils.parseDate("20191220145900", new String[]{PARAMETER_FORMAT_TIME});
Assert.assertEquals(parameterString, replacePlaceholders(parameterString, cronTimeStr, true));
}
/**
* Test curingGlobalParams
*/
@Test
public void testCuringGlobalParams() throws Exception {
//define globalMap
Map<String, String> globalParamMap = new HashMap<>();
globalParamMap.put("globalParams1","Params1");
//define globalParamList
List<Property> globalParamList = new ArrayList<>();
//define scheduleTime
Date scheduleTime = DateUtils.parseDate("20191220145900", new String[]{PARAMETER_FORMAT_TIME});
//test globalParamList is null
String result = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
Assert.assertNull(result);
Assert.assertNull(ParameterUtils.curingGlobalParams(null,null,CommandType.START_CURRENT_TASK_PROCESS,null));
Assert.assertNull(ParameterUtils.curingGlobalParams(globalParamMap,null,CommandType.START_CURRENT_TASK_PROCESS,scheduleTime));
//test globalParamList is not null
Property property=new Property("testGlobalParam", Direct.IN, DataType.VARCHAR,"testGlobalParam");
globalParamList.add(property);
String result2 = ParameterUtils.curingGlobalParams(null,globalParamList,CommandType.START_CURRENT_TASK_PROCESS,scheduleTime);
Assert.assertEquals(result2, JSONObject.toJSONString(globalParamList));
String result3 = ParameterUtils.curingGlobalParams(globalParamMap,globalParamList,CommandType.START_CURRENT_TASK_PROCESS,null);
Assert.assertEquals(result3, JSONObject.toJSONString(globalParamList));
String result4 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
Assert.assertEquals(result4, JSONObject.toJSONString(globalParamList));
//test var $ startsWith
globalParamMap.put("bizDate","${system.biz.date}");
globalParamMap.put("b1zCurdate","${system.biz.curdate}");
Property property2=new Property("testParamList1", Direct.IN, DataType.VARCHAR,"testParamList");
Property property3=new Property("testParamList2", Direct.IN, DataType.VARCHAR,"{testParamList1}");
Property property4=new Property("testParamList3", Direct.IN, DataType.VARCHAR,"${b1zCurdate}");
globalParamList.add(property2);
globalParamList.add(property3);
globalParamList.add(property4);
String result5 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
Assert.assertEquals(result5,JSONUtils.toJsonString(globalParamList));
}
/**
* Test handleEscapes
*/
@Test
public void testHandleEscapes() throws Exception {
Assert.assertNull(ParameterUtils.handleEscapes(null));
Assert.assertEquals("",ParameterUtils.handleEscapes(""));
Assert.assertEquals("test Parameter",ParameterUtils.handleEscapes("test Parameter"));
Assert.assertEquals("////%test////%Parameter",ParameterUtils.handleEscapes("%test%Parameter"));
}
}

6
dolphinscheduler-dao/src/main/resources/application-dao.properties

@ -19,12 +19,12 @@
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
# postgre
spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://192.168.xx.xx:5432/dolphinscheduler
spring.datasource.url=jdbc:postgresql://localhost:5432/dolphinscheduler
# mysql
#spring.datasource.driver-class-name=com.mysql.jdbc.Driver
#spring.datasource.url=jdbc:mysql://192.168.xx.xx:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
spring.datasource.username=xx
spring.datasource.password=xx
spring.datasource.username=test
spring.datasource.password=test
# connection configuration
spring.datasource.initialSize=5

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java

@ -72,7 +72,6 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread {
this.taskInstance.setState(ExecutionStatus.KILL);
}else{
this.taskInstance.setState(subProcessInstance.getState());
result = true;
}
}
taskInstance.setEndTime(new Date());

126
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/AbstractMonitor.java

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.monitor;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* abstract server monitor and auto restart server
*/
@Component
public abstract class AbstractMonitor implements Monitor {
private static final Logger logger = LoggerFactory.getLogger(AbstractMonitor.class);
@Autowired
private RunConfig runConfig;
/**
* monitor server and restart
*/
@Override
public void monitor(String masterPath,String workerPath,Integer port,String installPath) {
try {
restartServer(masterPath,port,installPath);
restartServer(workerPath,port,installPath);
}catch (Exception e){
logger.error("server start up error",e);
}
}
private void restartServer(String path,Integer port,String installPath) throws Exception{
String type = path.split("/")[2];
String serverName = null;
String nodes = null;
if ("masters".equals(type)){
serverName = "master-server";
nodes = runConfig.getMasters();
}else if ("workers".equals(type)){
serverName = "worker-server";
nodes = runConfig.getWorkers();
}
Map<String, String> activeNodeMap = getActiveNodesByPath(path);
Set<String> needRestartServer = getNeedRestartServer(getRunConfigServer(nodes),
activeNodeMap.keySet());
for (String node : needRestartServer){
// os.system('ssh -p ' + ssh_port + ' ' + self.get_ip_by_hostname(master) + ' sh ' + install_path + '/bin/dolphinscheduler-daemon.sh start master-server')
String runCmd = "ssh -p " + port + " " + node + " sh " + installPath + "/bin/dolphinscheduler-daemon.sh start " + serverName;
Runtime.getRuntime().exec(runCmd);
}
}
/**
* get need restart server
* @param deployedNodes deployedNodes
* @param activeNodes activeNodes
* @return need restart server
*/
private Set<String> getNeedRestartServer(Set<String> deployedNodes,Set<String> activeNodes){
if (CollectionUtils.isEmpty(activeNodes)){
return deployedNodes;
}
Set<String> result = new HashSet<>();
result.addAll(deployedNodes);
result.removeAll(activeNodes);
return result;
}
/**
* run config masters/workers
* @return master set/worker set
*/
private Set<String> getRunConfigServer(String nodes){
Set<String> nodeSet = new HashSet();
if (StringUtils.isEmpty(nodes)){
return null;
}
String[] nodeArr = nodes.split(",");
for (String node : nodeArr){
nodeSet.add(node);
}
return nodeSet;
}
/**
* get active nodes by path
* @param path path
* @return active nodes
*/
protected abstract Map<String,String> getActiveNodesByPath(String path);
}

28
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/Monitor.java

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.monitor;
/**
* server monitor and auto restart server
*/
public interface Monitor {
/**
* monitor server and restart
*/
void monitor(String masterPath, String workerPath, Integer port, String installPath);
}

63
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/MonitorServer.java

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.monitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
/**
* monitor server
*/
@ComponentScan("org.apache.dolphinscheduler")
public class MonitorServer implements CommandLineRunner {
private static Integer ARGS_LENGTH = 4;
private static final Logger logger = LoggerFactory.getLogger(MonitorServer.class);
/**
* monitor
*/
@Autowired
private Monitor monitor;
public static void main(String[] args) throws Exception{
new SpringApplicationBuilder(MonitorServer.class).web(WebApplicationType.NONE).run(args);
}
@Override
public void run(String... args) throws Exception {
if (args.length != ARGS_LENGTH){
logger.error("Usage: <masterPath> <workerPath> <port> <installPath>");
return;
}
String masterPath = args[0];
String workerPath = args[1];
Integer port = Integer.parseInt(args[2]);
String installPath = args[3];
monitor.monitor(masterPath,workerPath,port,installPath);
}
}

85
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RunConfig.java

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.monitor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
/**
* zookeeper conf
*/
@Component
@PropertySource("classpath:config/run_config.conf")
public class RunConfig {
//zk connect config
@Value("${masters}")
private String masters;
@Value("${workers}")
private String workers;
@Value("${alertServer}")
private String alertServer;
@Value("${apiServers}")
private String apiServers;
@Value("${sshPort}")
private String sshPort;
public String getMasters() {
return masters;
}
public void setMasters(String masters) {
this.masters = masters;
}
public String getWorkers() {
return workers;
}
public void setWorkers(String workers) {
this.workers = workers;
}
public String getAlertServer() {
return alertServer;
}
public void setAlertServer(String alertServer) {
this.alertServer = alertServer;
}
public String getApiServers() {
return apiServers;
}
public void setApiServers(String apiServers) {
this.apiServers = apiServers;
}
public String getSshPort() {
return sshPort;
}
public void setSshPort(String sshPort) {
this.sshPort = sshPort;
}
}

62
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/ZKMonitorImpl.java

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.monitor;
import org.apache.dolphinscheduler.common.zk.ZookeeperOperator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* zk monitor server impl
*/
@Component
public class ZKMonitorImpl extends AbstractMonitor {
/**
* zookeeper operator
*/
@Autowired
private ZookeeperOperator zookeeperOperator;
/**
* get active nodes map by path
* @param path path
* @return active nodes map
*/
@Override
protected Map<String,String> getActiveNodesByPath(String path) {
Map<String,String> maps = new HashMap<>();
List<String> childrenList = zookeeperOperator.getChildrenKeys(path);
if (childrenList == null){
return maps;
}
for (String child : childrenList){
maps.put(child.split("_")[0],child);
}
return maps;
}
}

75
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.commons.lang.StringUtils;
import org.slf4j.LoggerFactory;
@ -44,9 +45,11 @@ public class FlinkArgsUtils {
*/
public static List<String> buildArgs(FlinkParameters param) {
List<String> args = new ArrayList<>();
String deployMode = "cluster";
if (StringUtils.isNotEmpty(param.getDeployMode())) {
deployMode = param.getDeployMode();
String tmpDeployMode = param.getDeployMode();
if (StringUtils.isNotEmpty(tmpDeployMode)) {
deployMode = tmpDeployMode;
}
if (!"local".equals(deployMode)) {
@ -54,68 +57,70 @@ public class FlinkArgsUtils {
args.add(Constants.FLINK_YARN_CLUSTER); //yarn-cluster
if (param.getSlot() != 0) {
int slot = param.getSlot();
if (slot != 0) {
args.add(Constants.FLINK_YARN_SLOT);
args.add(String.format("%d", param.getSlot())); //-ys
args.add(String.format("%d", slot)); //-ys
}
if (StringUtils.isNotEmpty(param.getAppName())) { //-ynm
String appName = param.getAppName();
if (StringUtils.isNotEmpty(appName)) { //-ynm
args.add(Constants.FLINK_APP_NAME);
args.add(param.getAppName());
args.add(appName);
}
if (param.getTaskManager() != 0) { //-yn
int taskManager = param.getTaskManager();
if (taskManager != 0) { //-yn
args.add(Constants.FLINK_TASK_MANAGE);
args.add(String.format("%d", param.getTaskManager()));
args.add(String.format("%d", taskManager));
}
if (StringUtils.isNotEmpty(param.getJobManagerMemory())) {
String jobManagerMemory = param.getJobManagerMemory();
if (StringUtils.isNotEmpty(jobManagerMemory)) {
args.add(Constants.FLINK_JOB_MANAGE_MEM);
args.add(param.getJobManagerMemory()); //-yjm
args.add(jobManagerMemory); //-yjm
}
if (StringUtils.isNotEmpty(param.getTaskManagerMemory())) { // -ytm
String taskManagerMemory = param.getTaskManagerMemory();
if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm
args.add(Constants.FLINK_TASK_MANAGE_MEM);
args.add(param.getTaskManagerMemory());
args.add(taskManagerMemory);
}
args.add(Constants.FLINK_detach); //-d
}
if (param.getProgramType() != null) {
if (param.getProgramType() != ProgramType.PYTHON) {
if (StringUtils.isNotEmpty(param.getMainClass())) {
args.add(Constants.FLINK_MAIN_CLASS); //-c
args.add(param.getMainClass()); //main class
}
}
ProgramType programType = param.getProgramType();
String mainClass = param.getMainClass();
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
args.add(Constants.FLINK_MAIN_CLASS); //-c
args.add(param.getMainClass()); //main class
}
if (param.getMainJar() != null) {
args.add(param.getMainJar().getRes());
ResourceInfo mainJar = param.getMainJar();
if (mainJar != null) {
args.add(mainJar.getRes());
}
if (StringUtils.isNotEmpty(param.getMainArgs())) {
args.add(param.getMainArgs());
String mainArgs = param.getMainArgs();
if (StringUtils.isNotEmpty(mainArgs)) {
args.add(mainArgs);
}
// --files --conf --libjar ...
if (StringUtils.isNotEmpty(param.getOthers())) {
String others = param.getOthers();
if (!others.contains("--qu")) {
if (StringUtils.isNotEmpty(param.getQueue()) && !deployMode.equals("local")) {
args.add(Constants.FLINK_QUEUE);
args.add(param.getQueue());
}
String others = param.getOthers();
String queue = param.getQueue();
if (StringUtils.isNotEmpty(others)) {
if (!others.contains(Constants.FLINK_QUEUE) && StringUtils.isNotEmpty(queue) && !deployMode.equals("local")) {
args.add(Constants.FLINK_QUEUE);
args.add(param.getQueue());
}
args.add(param.getOthers());
} else if (StringUtils.isNotEmpty(param.getQueue()) && !deployMode.equals("local")) {
args.add(others);
} else if (StringUtils.isNotEmpty(queue) && !deployMode.equals("local")) {
args.add(Constants.FLINK_QUEUE);
args.add(param.getQueue());
}
return args;

59
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/RemoveZKNode.java

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.zk.ZookeeperOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan("org.apache.dolphinscheduler")
public class RemoveZKNode implements CommandLineRunner {
private static Integer ARGS_LENGTH = 1;
private static final Logger logger = LoggerFactory.getLogger(RemoveZKNode.class);
/**
* zookeeper operator
*/
@Autowired
private ZookeeperOperator zookeeperOperator;
public static void main(String[] args) {
new SpringApplicationBuilder(RemoveZKNode.class).web(WebApplicationType.NONE).run(args);
}
@Override
public void run(String... args) throws Exception {
if (args.length != ARGS_LENGTH){
logger.error("Usage: <node>");
return;
}
zookeeperOperator.remove(args[0]);
zookeeperOperator.close();
}
}

39
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtil.java

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
/**
* sensitive log Util
*/
public class SensitiveLogUtil {
/**
* @param dataSourcePwd data source password
* @return String
*/
public static String maskDataSourcePwd(String dataSourcePwd){
if (StringUtils.isNotEmpty(dataSourcePwd)) {
dataSourcePwd = Constants.PASSWORD_DEFAULT;
}
return dataSourcePwd;
}
}

92
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverter.java

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.log;
import ch.qos.logback.classic.pattern.MessageConverter;
import ch.qos.logback.classic.spi.ILoggingEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.server.utils.SensitiveLogUtil;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* sensitive data log converter
*/
@Slf4j
public class SensitiveDataConverter extends MessageConverter {
/**
* password pattern
*/
private final Pattern pwdPattern = Pattern.compile(Constants.DATASOURCE_PASSWORD_REGEX);
@Override
public String convert(ILoggingEvent event) {
// get original log
String requestLogMsg = event.getFormattedMessage();
// desensitization log
return convertMsg(requestLogMsg);
}
/**
* deal with sensitive log
*
* @param oriLogMsg original log
*/
private String convertMsg(final String oriLogMsg) {
String tempLogMsg = oriLogMsg;
if (StringUtils.isNotEmpty(tempLogMsg)) {
tempLogMsg = passwordHandler(pwdPattern, tempLogMsg);
}
return tempLogMsg;
}
/**
* password regex
*
* @param logMsg original log
*/
private String passwordHandler(Pattern pwdPattern, String logMsg) {
Matcher matcher = pwdPattern.matcher(logMsg);
StringBuffer sb = new StringBuffer(logMsg.length());
while (matcher.find()) {
String password = matcher.group();
String maskPassword = SensitiveLogUtil.maskDataSourcePwd(password);
matcher.appendReplacement(sb, maskPassword);
}
matcher.appendTail(sb);
return sb.toString();
}
}

0
script/config/install_config.conf → dolphinscheduler-server/src/main/resources/config/install_config.conf

0
script/config/run_config.conf → dolphinscheduler-server/src/main/resources/config/run_config.conf

4
dolphinscheduler-server/src/main/resources/worker_logback.xml

@ -18,6 +18,8 @@
<!-- Logback configuration. See http://logback.qos.ch/manual/index.html -->
<configuration scan="true" scanPeriod="120 seconds">
<conversionRule conversionWord="msg"
converterClass="org.apache.dolphinscheduler.server.worker.log.SensitiveDataConverter"/>
<property name="log.base" value="logs"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
@ -31,7 +33,7 @@
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<filter class="org.apache.dolphinscheduler.server.worker.log.TaskLogFilter"></filter>
<filter class="org.apache.dolphinscheduler.server.worker.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.server.worker.log.TaskLogDiscriminator">
<key>taskAppId</key>
<logBase>${log.base}</logBase>

131
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
/**
* Test FlinkArgsUtils
*/
public class FlinkArgsUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(FlinkArgsUtilsTest.class);
public String mode = "cluster";
public int slot = 2;
public String appName = "testFlink";
public int taskManager = 4;
public String taskManagerMemory = "2G";
public String jobManagerMemory = "4G";
public ProgramType programType = ProgramType.JAVA;
public String mainClass = "com.test";
public ResourceInfo mainJar = null;
public String mainArgs = "testArgs";
public String queue = "queue1";
public String others = "--input file:///home";
@Before
public void setUp() throws Exception {
ResourceInfo main = new ResourceInfo();
main.setRes("testflink-1.0.0-SNAPSHOT.jar");
mainJar = main;
}
/**
* Test buildArgs
*/
@Test
public void testBuildArgs() {
//Define params
FlinkParameters param = new FlinkParameters();
param.setDeployMode(mode);
param.setMainClass(mainClass);
param.setAppName(appName);
param.setSlot(slot);
param.setTaskManager(taskManager);
param.setJobManagerMemory(jobManagerMemory);
param.setTaskManagerMemory(taskManagerMemory);
param.setMainJar(mainJar);
param.setProgramType(programType);
param.setMainArgs(mainArgs);
param.setQueue(queue);
param.setOthers(others);
//Invoke buildArgs
List<String> result = FlinkArgsUtils.buildArgs(param);
for (String s : result) {
logger.info(s);
}
//Expected values and order
assertEquals(result.size(),20);
assertEquals(result.get(0),"-m");
assertEquals(result.get(1),"yarn-cluster");
assertEquals(result.get(2),"-ys");
assertSame(Integer.valueOf(result.get(3)),slot);
assertEquals(result.get(4),"-ynm");
assertEquals(result.get(5),appName);
assertEquals(result.get(6),"-yn");
assertSame(Integer.valueOf(result.get(7)),taskManager);
assertEquals(result.get(8),"-yjm");
assertEquals(result.get(9),jobManagerMemory);
assertEquals(result.get(10),"-ytm");
assertEquals(result.get(11),taskManagerMemory);
assertEquals(result.get(12),"-d");
assertEquals(result.get(13),"-c");
assertEquals(result.get(14),mainClass);
assertEquals(result.get(15),mainJar.getRes());
assertEquals(result.get(16),mainArgs);
assertEquals(result.get(17),"--qu");
assertEquals(result.get(18),queue);
assertEquals(result.get(19),others);
//Others param without --qu
FlinkParameters param1 = new FlinkParameters();
param1.setQueue(queue);
param1.setDeployMode(mode);
result = FlinkArgsUtils.buildArgs(param1);
assertEquals(result.size(),5);
}
}

37
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtilTest.java

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.junit.Assert;
import org.junit.Test;
public class SensitiveLogUtilTest {
@Test
public void testMaskDataSourcePwd() {
String password = "123456";
String emptyPassword = "";
Assert.assertEquals(Constants.PASSWORD_DEFAULT, SensitiveLogUtil.maskDataSourcePwd(password));
Assert.assertEquals("", SensitiveLogUtil.maskDataSourcePwd(emptyPassword));
}
}

92
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverterTest.java

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.log;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.server.utils.SensitiveLogUtil;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class SensitiveDataConverterTest {
private final Logger logger = LoggerFactory.getLogger(SensitiveDataConverterTest.class);
/**
* password pattern
*/
private final Pattern pwdPattern = Pattern.compile(Constants.DATASOURCE_PASSWORD_REGEX);
/**
* mask sensitive logMsg - sql task datasource password
*/
@Test
public void testPwdLogMsgConverter() {
String logMsg = "{\"address\":\"jdbc:mysql://192.168.xx.xx:3306\"," +
"\"database\":\"carbond\"," +
"\"jdbcUrl\":\"jdbc:mysql://192.168.xx.xx:3306/ods\"," +
"\"user\":\"view\"," +
"\"password\":\"view1\"}";
String maskLogMsg = "{\"address\":\"jdbc:mysql://192.168.xx.xx:3306\"," +
"\"database\":\"carbond\"," +
"\"jdbcUrl\":\"jdbc:mysql://192.168.xx.xx:3306/ods\"," +
"\"user\":\"view\"," +
"\"password\":\"******\"}";
logger.info("parameter : {}", logMsg);
logger.info("parameter : {}", passwordHandler(pwdPattern, logMsg));
Assert.assertNotEquals(logMsg, passwordHandler(pwdPattern, logMsg));
Assert.assertEquals(maskLogMsg, passwordHandler(pwdPattern, logMsg));
}
/**
* password regex test
*
* @param logMsg original log
*/
private static String passwordHandler(Pattern pattern, String logMsg) {
Matcher matcher = pattern.matcher(logMsg);
StringBuffer sb = new StringBuffer(logMsg.length());
while (matcher.find()) {
String password = matcher.group();
String maskPassword = SensitiveLogUtil.maskDataSourcePwd(password);
matcher.appendReplacement(sb, maskPassword);
}
matcher.appendTail(sb);
return sb.toString();
}
}

31
install.sh

@ -469,8 +469,8 @@ sh ${workDir}/script/stop-all.sh
# 4,delete zk node
echo "4,delete zk node"
sleep 1
python ${workDir}/script/del-zk-node.py $zkQuorum $zkRoot
sh ${workDir}/script/remove-zk-node.sh $zkRoot
# 5,scp resources
echo "5,scp resources"
@ -485,29 +485,4 @@ fi
# 6,startup
echo "6,startup"
sh ${workDir}/script/start-all.sh
# 7,start monitoring self-starting script
monitor_pid=${workDir}/monitor_server.pid
if [ "true" = $monitorServerState ];then
if [ -f $monitor_pid ]; then
TARGET_PID=`cat $monitor_pid`
if kill -0 $TARGET_PID > /dev/null 2>&1; then
echo "monitor server running as process ${TARGET_PID}.Stopping"
kill $TARGET_PID
sleep 5
if kill -0 $TARGET_PID > /dev/null 2>&1; then
echo "monitor server did not stop gracefully after 5 seconds: killing with kill -9"
kill -9 $TARGET_PID
fi
else
echo "no monitor server to stop"
fi
echo "monitor server running as process ${TARGET_PID}.Stopped success"
rm -f $monitor_pid
fi
nohup python -u ${workDir}/script/monitor-server.py $installPath $zkQuorum $zkMasters $zkWorkers > ${workDir}/monitor-server.log 2>&1 &
echo $! > $monitor_pid
echo "start monitor server success as process `cat $monitor_pid`"
fi
sh ${workDir}/script/start-all.sh

2
pom.xml

@ -655,6 +655,8 @@
<include>**/alert/utils/JSONUtilsTest.java</include>
<include>**/alert/utils/PropertyUtilsTest.java</include>
<include>**/server/utils/SparkArgsUtilsTest.java</include>
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
<include>**/dao/mapper/AccessTokenMapperTest.java</include>
</includes>
<!-- <skip>true</skip> -->
</configuration>

34
script/del-zk-node.py

@ -1,34 +0,0 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import time
import sys
from kazoo.client import KazooClient
class ZkClient:
def __init__(self):
self.zk = KazooClient(hosts=sys.argv[1])
self.zk.start()
def del_node(self):
self.zk.delete(sys.argv[2], recursive=True)
print('deleted success')
def __del__(self):
self.zk.stop()
if __name__ == '__main__':
zkclient = ZkClient()
zkclient.del_node()
time.sleep(2)

124
script/monitor-server.py

@ -1,124 +0,0 @@
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
'''
1, yum install pip
yum -y install python-pip
2, pip install kazoo
pip install kazoo
or
3, conda install kazoo
conda install -c conda-forge kazoo
run script and parameter description
nohup python -u monitor_server.py /data1_1T/dolphinscheduler 192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181 /dolphinscheduler/masters /dolphinscheduler/workers> monitor_server.log 2>&1 &
the parameters are as follows:
/data1_1T/dolphinscheduler : the value comes from the installPath in install.sh
192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181 : the value comes from zkQuorum in install.sh
the value comes from zkWorkers in install.sh
/dolphinscheduler/masters : the value comes from zkMasters in install.sh
/dolphinscheduler/workers : the value comes from zkWorkers in install.sh
'''
import sys
import socket
import os
import sched
import time
from datetime import datetime
from kazoo.client import KazooClient
schedule = sched.scheduler(time.time, time.sleep)
class ZkClient:
def __init__(self):
# hosts configuration zk address cluster
self.zk = KazooClient(hosts=zookeepers)
self.zk.start()
# read configuration files and assemble them into a dictionary
def read_file(self,path):
with open(path, 'r') as f:
dict = {}
for line in f.readlines():
arr = line.strip().split('=')
if (len(arr) == 2):
dict[arr[0]] = arr[1]
return dict
# get the ip address according to hostname
def get_ip_by_hostname(self,hostname):
return socket.gethostbyname(hostname)
# restart server
def restart_server(self,inc):
config_dict = self.read_file(install_path + '/conf/config/run_config.conf')
master_list = config_dict.get('masters').split(',')
print master_list
master_list = list(map(lambda item : self.get_ip_by_hostname(item),master_list))
worker_list = config_dict.get('workers').split(',')
print worker_list
worker_list = list(map(lambda item: self.get_ip_by_hostname(item), worker_list))
ssh_port = config_dict.get("sshPort")
print ssh_port
if (self.zk.exists(masters_zk_path)):
zk_master_list = []
zk_master_nodes = self.zk.get_children(masters_zk_path)
for zk_master_node in zk_master_nodes:
zk_master_list.append(zk_master_node.split('_')[0])
restart_master_list = list(set(master_list) - set(zk_master_list))
if (len(restart_master_list) != 0):
for master in restart_master_list:
print("master " + self.get_ip_by_hostname(master) + " server has down")
os.system('ssh -p ' + ssh_port + ' ' + self.get_ip_by_hostname(master) + ' sh ' + install_path + '/bin/dolphinscheduler-daemon.sh start master-server')
if (self.zk.exists(workers_zk_path)):
zk_worker_list = []
zk_worker_nodes = self.zk.get_children(workers_zk_path)
for zk_worker_node in zk_worker_nodes:
zk_worker_list.append(zk_worker_node.split('_')[0])
restart_worker_list = list(set(worker_list) - set(zk_worker_list))
if (len(restart_worker_list) != 0):
for worker in restart_worker_list:
print("worker " + self.get_ip_by_hostname(worker) + " server has down")
os.system('ssh -p ' + ssh_port + ' ' + self.get_ip_by_hostname(worker) + ' sh ' + install_path + '/bin/dolphinscheduler-daemon.sh start worker-server')
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
schedule.enter(inc, 0, self.restart_server, (inc,))
# default parameter 60s
def main(self,inc=60):
# the enter four parameters are: interval event, priority (sequence for simultaneous execution of two events arriving at the same time), function triggered by the call,
# the argument to the trigger function (tuple form)
schedule.enter(0, 0, self.restart_server, (inc,))
schedule.run()
if __name__ == '__main__':
if (len(sys.argv) < 4):
print('please input install_path,zookeepers,masters_zk_path and worker_zk_path')
install_path = sys.argv[1]
zookeepers = sys.argv[2]
masters_zk_path = sys.argv[3]
workers_zk_path = sys.argv[4]
zkClient = ZkClient()
zkClient.main(300)

52
script/monitor-server.sh

@ -0,0 +1,52 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
print_usage(){
printf $"USAGE:$0 masterPath workerPath port installPath\n"
exit 1
}
if [ $# -ne 4 ];then
print_usage
fi
masterPath=$1
workerPath=$2
port=$3
installPath=$4
BIN_DIR=`dirname $0`
BIN_DIR=`cd "$BIN_DIR"; pwd`
DOLPHINSCHEDULER_HOME=$BIN_DIR/..
export JAVA_HOME=$JAVA_HOME
export DOLPHINSCHEDULER_CONF_DIR=$DOLPHINSCHEDULER_HOME/conf
export DOLPHINSCHEDULER_LIB_JARS=$DOLPHINSCHEDULER_HOME/lib/*
export DOLPHINSCHEDULER_OPTS="-server -Xmx1g -Xms1g -Xss512k -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70"
export STOP_TIMEOUT=5
CLASS=org.apache.dolphinscheduler.server.monitor.MonitorServer
exec_command="$DOLPHINSCHEDULER_OPTS -classpath $DOLPHINSCHEDULER_CONF_DIR:$DOLPHINSCHEDULER_LIB_JARS $CLASS $masterPath $workerPath $port $installPath"
cd $DOLPHINSCHEDULER_HOME
$JAVA_HOME/bin/java $exec_command

48
script/remove-zk-node.sh

@ -0,0 +1,48 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
print_usage(){
printf $"USAGE:$0 rootNode\n"
exit 1
}
if [ $# -ne 1 ];then
print_usage
fi
rootNode=$1
BIN_DIR=`dirname $0`
BIN_DIR=`cd "$BIN_DIR"; pwd`
DOLPHINSCHEDULER_HOME=$BIN_DIR/..
export JAVA_HOME=$JAVA_HOME
export DOLPHINSCHEDULER_CONF_DIR=$DOLPHINSCHEDULER_HOME/conf
export DOLPHINSCHEDULER_LIB_JARS=$DOLPHINSCHEDULER_HOME/lib/*
export DOLPHINSCHEDULER_OPTS="-server -Xmx1g -Xms1g -Xss512k -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70"
export STOP_TIMEOUT=5
CLASS=org.apache.dolphinscheduler.server.utils.RemoveZKNode
exec_command="$DOLPHINSCHEDULER_OPTS -classpath $DOLPHINSCHEDULER_CONF_DIR:$DOLPHINSCHEDULER_LIB_JARS $CLASS $rootNode"
cd $DOLPHINSCHEDULER_HOME
$JAVA_HOME/bin/java $exec_command
Loading…
Cancel
Save