diff --git a/.github/workflows/ci_ut.yml b/.github/workflows/ci_ut.yml index c893662c1d..c415d35619 100644 --- a/.github/workflows/ci_ut.yml +++ b/.github/workflows/ci_ut.yml @@ -16,6 +16,9 @@ # on: ["pull_request"] +env: + DOCKER_DIR: ./docker + LOG_DIR: /tmp/dolphinscheduler name: Test Coveralls Parallel @@ -35,6 +38,8 @@ jobs: key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} restore-keys: | ${{ runner.os }}-maven- + - name: Bootstrap database + run: cd ${DOCKER_DIR} && docker-compose up -d - name: Set up JDK 1.8 uses: actions/setup-java@v1 with: @@ -44,3 +49,9 @@ jobs: export MAVEN_OPTS='-Dmaven.repo.local=.m2/repository -XX:+TieredCompilation -XX:TieredStopAtLevel=1 -XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -Xmx3g' mvn test -Dmaven.test.skip=false cobertura:cobertura CODECOV_TOKEN="09c2663f-b091-4258-8a47-c981827eb29a" bash <(curl -s https://codecov.io/bash) + - name: Collect logs + run: | + mkdir -p ${LOG_DIR} + cd ${DOCKER_DIR} + docker-compose logs db > ${LOG_DIR}/db.txt + continue-on-error: true diff --git a/docker/README.md b/docker/README.md new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000000..de5908583c --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,24 @@ +version: '2' +services: + zookeeper: + image: zookeeper + restart: always + container_name: zookeeper + ports: + - "2181:2181" + environment: + ZOO_MY_ID: 1 + db: + image: postgres + container_name: postgres + environment: + - POSTGRES_USER=test + - POSTGRES_PASSWORD=test + - POSTGRES_DB=dolphinscheduler + ports: + - "5432:5432" + volumes: + - pgdata:/var/lib/postgresql/data + - ./postgres/docker-entrypoint-initdb:/docker-entrypoint-initdb.d +volumes: + pgdata: diff --git a/docker/postgres/docker-entrypoint-initdb/init.sql b/docker/postgres/docker-entrypoint-initdb/init.sql new file mode 100755 index 0000000000..b3c61ebce4 --- /dev/null +++ b/docker/postgres/docker-entrypoint-initdb/init.sql @@ -0,0 +1,771 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS; +DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS; +DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE; +DROP TABLE IF EXISTS QRTZ_LOCKS; +DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS; +DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS; +DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS; +DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS; +DROP TABLE IF EXISTS QRTZ_TRIGGERS; +DROP TABLE IF EXISTS QRTZ_JOB_DETAILS; +DROP TABLE IF EXISTS QRTZ_CALENDARS; + +CREATE TABLE QRTZ_JOB_DETAILS( +SCHED_NAME character varying(120) NOT NULL, +JOB_NAME character varying(200) NOT NULL, +JOB_GROUP character varying(200) NOT NULL, +DESCRIPTION character varying(250) NULL, +JOB_CLASS_NAME character varying(250) NOT NULL, +IS_DURABLE boolean NOT NULL, +IS_NONCONCURRENT boolean NOT NULL, +IS_UPDATE_DATA boolean NOT NULL, +REQUESTS_RECOVERY boolean NOT NULL, +JOB_DATA bytea NULL); +alter table QRTZ_JOB_DETAILS add primary key(SCHED_NAME,JOB_NAME,JOB_GROUP); + +CREATE TABLE QRTZ_TRIGGERS ( +SCHED_NAME character varying(120) NOT NULL, +TRIGGER_NAME character varying(200) NOT NULL, +TRIGGER_GROUP character varying(200) NOT NULL, +JOB_NAME character varying(200) NOT NULL, +JOB_GROUP character varying(200) NOT NULL, +DESCRIPTION character varying(250) NULL, +NEXT_FIRE_TIME BIGINT NULL, +PREV_FIRE_TIME BIGINT NULL, +PRIORITY INTEGER NULL, +TRIGGER_STATE character varying(16) NOT NULL, +TRIGGER_TYPE character varying(8) NOT NULL, +START_TIME BIGINT NOT NULL, +END_TIME BIGINT NULL, +CALENDAR_NAME character varying(200) NULL, +MISFIRE_INSTR SMALLINT NULL, +JOB_DATA bytea NULL) ; +alter table QRTZ_TRIGGERS add primary key(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP); + +CREATE TABLE QRTZ_SIMPLE_TRIGGERS ( +SCHED_NAME character varying(120) NOT NULL, +TRIGGER_NAME character varying(200) NOT NULL, +TRIGGER_GROUP character varying(200) NOT NULL, +REPEAT_COUNT BIGINT NOT NULL, +REPEAT_INTERVAL BIGINT NOT NULL, +TIMES_TRIGGERED BIGINT NOT NULL) ; +alter table QRTZ_SIMPLE_TRIGGERS add primary key(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP); + +CREATE TABLE QRTZ_CRON_TRIGGERS ( +SCHED_NAME character varying(120) NOT NULL, +TRIGGER_NAME character varying(200) NOT NULL, +TRIGGER_GROUP character varying(200) NOT NULL, +CRON_EXPRESSION character varying(120) NOT NULL, +TIME_ZONE_ID character varying(80)) ; +alter table QRTZ_CRON_TRIGGERS add primary key(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP); + +CREATE TABLE QRTZ_SIMPROP_TRIGGERS + ( + SCHED_NAME character varying(120) NOT NULL, + TRIGGER_NAME character varying(200) NOT NULL, + TRIGGER_GROUP character varying(200) NOT NULL, + STR_PROP_1 character varying(512) NULL, + STR_PROP_2 character varying(512) NULL, + STR_PROP_3 character varying(512) NULL, + INT_PROP_1 INT NULL, + INT_PROP_2 INT NULL, + LONG_PROP_1 BIGINT NULL, + LONG_PROP_2 BIGINT NULL, + DEC_PROP_1 NUMERIC(13,4) NULL, + DEC_PROP_2 NUMERIC(13,4) NULL, + BOOL_PROP_1 boolean NULL, + BOOL_PROP_2 boolean NULL) ; +alter table QRTZ_SIMPROP_TRIGGERS add primary key(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP); + +CREATE TABLE QRTZ_BLOB_TRIGGERS ( +SCHED_NAME character varying(120) NOT NULL, +TRIGGER_NAME character varying(200) NOT NULL, +TRIGGER_GROUP character varying(200) NOT NULL, +BLOB_DATA bytea NULL) ; +alter table QRTZ_BLOB_TRIGGERS add primary key(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP); + +CREATE TABLE QRTZ_CALENDARS ( +SCHED_NAME character varying(120) NOT NULL, +CALENDAR_NAME character varying(200) NOT NULL, +CALENDAR bytea NOT NULL) ; +alter table QRTZ_CALENDARS add primary key(SCHED_NAME,CALENDAR_NAME); + +CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS ( +SCHED_NAME character varying(120) NOT NULL, +TRIGGER_GROUP character varying(200) NOT NULL) ; +alter table QRTZ_PAUSED_TRIGGER_GRPS add primary key(SCHED_NAME,TRIGGER_GROUP); + +CREATE TABLE QRTZ_FIRED_TRIGGERS ( +SCHED_NAME character varying(120) NOT NULL, +ENTRY_ID character varying(95) NOT NULL, +TRIGGER_NAME character varying(200) NOT NULL, +TRIGGER_GROUP character varying(200) NOT NULL, +INSTANCE_NAME character varying(200) NOT NULL, +FIRED_TIME BIGINT NOT NULL, +SCHED_TIME BIGINT NOT NULL, +PRIORITY INTEGER NOT NULL, +STATE character varying(16) NOT NULL, +JOB_NAME character varying(200) NULL, +JOB_GROUP character varying(200) NULL, +IS_NONCONCURRENT boolean NULL, +REQUESTS_RECOVERY boolean NULL) ; +alter table QRTZ_FIRED_TRIGGERS add primary key(SCHED_NAME,ENTRY_ID); + +CREATE TABLE QRTZ_SCHEDULER_STATE ( +SCHED_NAME character varying(120) NOT NULL, +INSTANCE_NAME character varying(200) NOT NULL, +LAST_CHECKIN_TIME BIGINT NOT NULL, +CHECKIN_INTERVAL BIGINT NOT NULL) ; +alter table QRTZ_SCHEDULER_STATE add primary key(SCHED_NAME,INSTANCE_NAME); + +CREATE TABLE QRTZ_LOCKS ( +SCHED_NAME character varying(120) NOT NULL, +LOCK_NAME character varying(40) NOT NULL) ; +alter table QRTZ_LOCKS add primary key(SCHED_NAME,LOCK_NAME); + +CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY); +CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP); + +CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP); +CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP); +CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME); +CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP); +CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE); +CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE); +CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE); +CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME); +CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME); +CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME); +CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE); +CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE); + +CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME); +CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY); +CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP); +CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP); +CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP); +CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP); + + +-- +-- Table structure for table t_ds_access_token +-- + +DROP TABLE IF EXISTS t_ds_access_token; +CREATE TABLE t_ds_access_token ( + id int NOT NULL , + user_id int DEFAULT NULL , + token varchar(64) DEFAULT NULL , + expire_time timestamp DEFAULT NULL , + create_time timestamp DEFAULT NULL , + update_time timestamp DEFAULT NULL , + PRIMARY KEY (id) +) ; + +-- +-- Table structure for table t_ds_alert +-- + +DROP TABLE IF EXISTS t_ds_alert; +CREATE TABLE t_ds_alert ( + id int NOT NULL , + title varchar(64) DEFAULT NULL , + show_type int DEFAULT NULL , + content text , + alert_type int DEFAULT NULL , + alert_status int DEFAULT '0' , + log text , + alertgroup_id int DEFAULT NULL , + receivers text , + receivers_cc text , + create_time timestamp DEFAULT NULL , + update_time timestamp DEFAULT NULL , + PRIMARY KEY (id) +) ; +-- +-- Table structure for table t_ds_alertgroup +-- + +DROP TABLE IF EXISTS t_ds_alertgroup; +CREATE TABLE t_ds_alertgroup ( + id int NOT NULL , + group_name varchar(255) DEFAULT NULL , + group_type int DEFAULT NULL , + description varchar(255) DEFAULT NULL , + create_time timestamp DEFAULT NULL , + update_time timestamp DEFAULT NULL , + PRIMARY KEY (id) +) ; + +-- +-- Table structure for table t_ds_command +-- + +DROP TABLE IF EXISTS t_ds_command; +CREATE TABLE t_ds_command ( + id int NOT NULL , + command_type int DEFAULT NULL , + process_definition_id int DEFAULT NULL , + command_param text , + task_depend_type int DEFAULT NULL , + failure_strategy int DEFAULT '0' , + warning_type int DEFAULT '0' , + warning_group_id int DEFAULT NULL , + schedule_time timestamp DEFAULT NULL , + start_time timestamp DEFAULT NULL , + executor_id int DEFAULT NULL , + dependence varchar(255) DEFAULT NULL , + update_time timestamp DEFAULT NULL , + process_instance_priority int DEFAULT NULL , + worker_group_id int DEFAULT '-1' , + PRIMARY KEY (id) +) ; + +-- +-- Table structure for table t_ds_datasource +-- + +DROP TABLE IF EXISTS t_ds_datasource; +CREATE TABLE t_ds_datasource ( + id int NOT NULL , + name varchar(64) NOT NULL , + note varchar(256) DEFAULT NULL , + type int NOT NULL , + user_id int NOT NULL , + connection_params text NOT NULL , + create_time timestamp NOT NULL , + update_time timestamp DEFAULT NULL , + PRIMARY KEY (id) +) ; + +-- +-- Table structure for table t_ds_error_command +-- + +DROP TABLE IF EXISTS t_ds_error_command; +CREATE TABLE t_ds_error_command ( + id int NOT NULL , + command_type int DEFAULT NULL , + executor_id int DEFAULT NULL , + process_definition_id int DEFAULT NULL , + command_param text , + task_depend_type int DEFAULT NULL , + failure_strategy int DEFAULT '0' , + warning_type int DEFAULT '0' , + warning_group_id int DEFAULT NULL , + schedule_time timestamp DEFAULT NULL , + start_time timestamp DEFAULT NULL , + update_time timestamp DEFAULT NULL , + dependence text , + process_instance_priority int DEFAULT NULL , + worker_group_id int DEFAULT '-1' , + message text , + PRIMARY KEY (id) +); +-- +-- Table structure for table t_ds_master_server +-- + +DROP TABLE IF EXISTS t_ds_master_server; +CREATE TABLE t_ds_master_server ( + id int NOT NULL , + host varchar(45) DEFAULT NULL , + port int DEFAULT NULL , + zk_directory varchar(64) DEFAULT NULL , + res_info varchar(256) DEFAULT NULL , + create_time timestamp DEFAULT NULL , + last_heartbeat_time timestamp DEFAULT NULL , + PRIMARY KEY (id) +) ; + +-- +-- Table structure for table t_ds_process_definition +-- + +DROP TABLE IF EXISTS t_ds_process_definition; +CREATE TABLE t_ds_process_definition ( + id int NOT NULL , + name varchar(255) DEFAULT NULL , + version int DEFAULT NULL , + release_state int DEFAULT NULL , + project_id int DEFAULT NULL , + user_id int DEFAULT NULL , + process_definition_json text , + description text , + global_params text , + flag int DEFAULT NULL , + locations text , + connects text , + receivers text , + receivers_cc text , + create_time timestamp DEFAULT NULL , + timeout int DEFAULT '0' , + tenant_id int NOT NULL DEFAULT '-1' , + update_time timestamp DEFAULT NULL , + PRIMARY KEY (id) +) ; + +create index process_definition_index on t_ds_process_definition (project_id,id); + +-- +-- Table structure for table t_ds_process_instance +-- + +DROP TABLE IF EXISTS t_ds_process_instance; +CREATE TABLE t_ds_process_instance ( + id int NOT NULL , + name varchar(255) DEFAULT NULL , + process_definition_id int DEFAULT NULL , + state int DEFAULT NULL , + recovery int DEFAULT NULL , + start_time timestamp DEFAULT NULL , + end_time timestamp DEFAULT NULL , + run_times int DEFAULT NULL , + host varchar(45) DEFAULT NULL , + command_type int DEFAULT NULL , + command_param text , + task_depend_type int DEFAULT NULL , + max_try_times int DEFAULT '0' , + failure_strategy int DEFAULT '0' , + warning_type int DEFAULT '0' , + warning_group_id int DEFAULT NULL , + schedule_time timestamp DEFAULT NULL , + command_start_time timestamp DEFAULT NULL , + global_params text , + process_instance_json text , + flag int DEFAULT '1' , + update_time timestamp NULL , + is_sub_process int DEFAULT '0' , + executor_id int NOT NULL , + locations text , + connects text , + history_cmd text , + dependence_schedule_times text , + process_instance_priority int DEFAULT NULL , + worker_group_id int DEFAULT '-1' , + timeout int DEFAULT '0' , + tenant_id int NOT NULL DEFAULT '-1' , + PRIMARY KEY (id) +) ; + create index process_instance_index on t_ds_process_instance (process_definition_id,id); + create index start_time_index on t_ds_process_instance (start_time); + +-- +-- Table structure for table t_ds_project +-- + +DROP TABLE IF EXISTS t_ds_project; +CREATE TABLE t_ds_project ( + id int NOT NULL , + name varchar(100) DEFAULT NULL , + description varchar(200) DEFAULT NULL , + user_id int DEFAULT NULL , + flag int DEFAULT '1' , + create_time timestamp DEFAULT CURRENT_TIMESTAMP , + update_time timestamp DEFAULT CURRENT_TIMESTAMP , + PRIMARY KEY (id) +) ; + create index user_id_index on t_ds_project (user_id); + +-- +-- Table structure for table t_ds_queue +-- + +DROP TABLE IF EXISTS t_ds_queue; +CREATE TABLE t_ds_queue ( + id int NOT NULL , + queue_name varchar(64) DEFAULT NULL , + queue varchar(64) DEFAULT NULL , + create_time timestamp DEFAULT NULL , + update_time timestamp DEFAULT NULL , + PRIMARY KEY (id) +); + + +-- +-- Table structure for table t_ds_relation_datasource_user +-- + +DROP TABLE IF EXISTS t_ds_relation_datasource_user; +CREATE TABLE t_ds_relation_datasource_user ( + id int NOT NULL , + user_id int NOT NULL , + datasource_id int DEFAULT NULL , + perm int DEFAULT '1' , + create_time timestamp DEFAULT NULL , + update_time timestamp DEFAULT NULL , + PRIMARY KEY (id) +) ; +; + +-- +-- Table structure for table t_ds_relation_process_instance +-- + +DROP TABLE IF EXISTS t_ds_relation_process_instance; +CREATE TABLE t_ds_relation_process_instance ( + id int NOT NULL , + parent_process_instance_id int DEFAULT NULL , + parent_task_instance_id int DEFAULT NULL , + process_instance_id int DEFAULT NULL , + PRIMARY KEY (id) +) ; + + +-- +-- Table structure for table t_ds_relation_project_user +-- + +DROP TABLE IF EXISTS t_ds_relation_project_user; +CREATE TABLE t_ds_relation_project_user ( + id int NOT NULL , + user_id int NOT NULL , + project_id int DEFAULT NULL , + perm int DEFAULT '1' , + create_time timestamp DEFAULT NULL , + update_time timestamp DEFAULT NULL , + PRIMARY KEY (id) +) ; +create index relation_project_user_id_index on t_ds_relation_project_user (user_id); + +-- +-- Table structure for table t_ds_relation_resources_user +-- + +DROP TABLE IF EXISTS t_ds_relation_resources_user; +CREATE TABLE t_ds_relation_resources_user ( + id int NOT NULL , + user_id int NOT NULL , + resources_id int DEFAULT NULL , + perm int DEFAULT '1' , + create_time timestamp DEFAULT NULL , + update_time timestamp DEFAULT NULL , + PRIMARY KEY (id) +) ; + +-- +-- Table structure for table t_ds_relation_udfs_user +-- + +DROP TABLE IF EXISTS t_ds_relation_udfs_user; +CREATE TABLE t_ds_relation_udfs_user ( + id int NOT NULL , + user_id int NOT NULL , + udf_id int DEFAULT NULL , + perm int DEFAULT '1' , + create_time timestamp DEFAULT NULL , + update_time timestamp DEFAULT NULL , + PRIMARY KEY (id) +) ; +; + +-- +-- Table structure for table t_ds_relation_user_alertgroup +-- + +DROP TABLE IF EXISTS t_ds_relation_user_alertgroup; +CREATE TABLE t_ds_relation_user_alertgroup ( + id int NOT NULL, + alertgroup_id int DEFAULT NULL, + user_id int DEFAULT NULL, + create_time timestamp DEFAULT NULL, + update_time timestamp DEFAULT NULL, + PRIMARY KEY (id) +); + +-- +-- Table structure for table t_ds_resources +-- + +DROP TABLE IF EXISTS t_ds_resources; +CREATE TABLE t_ds_resources ( + id int NOT NULL , + alias varchar(64) DEFAULT NULL , + file_name varchar(64) DEFAULT NULL , + description varchar(256) DEFAULT NULL , + user_id int DEFAULT NULL , + type int DEFAULT NULL , + size bigint DEFAULT NULL , + create_time timestamp DEFAULT NULL , + update_time timestamp DEFAULT NULL , + PRIMARY KEY (id) +) ; +; + +-- +-- Table structure for table t_ds_schedules +-- + +DROP TABLE IF EXISTS t_ds_schedules; +CREATE TABLE t_ds_schedules ( + id int NOT NULL , + process_definition_id int NOT NULL , + start_time timestamp NOT NULL , + end_time timestamp NOT NULL , + crontab varchar(256) NOT NULL , + failure_strategy int NOT NULL , + user_id int NOT NULL , + release_state int NOT NULL , + warning_type int NOT NULL , + warning_group_id int DEFAULT NULL , + process_instance_priority int DEFAULT NULL , + worker_group_id int DEFAULT '-1' , + create_time timestamp NOT NULL , + update_time timestamp NOT NULL , + PRIMARY KEY (id) +); + +-- +-- Table structure for table t_ds_session +-- + +DROP TABLE IF EXISTS t_ds_session; +CREATE TABLE t_ds_session ( + id varchar(64) NOT NULL , + user_id int DEFAULT NULL , + ip varchar(45) DEFAULT NULL , + last_login_time timestamp DEFAULT NULL , + PRIMARY KEY (id) +); + +-- +-- Table structure for table t_ds_task_instance +-- + +DROP TABLE IF EXISTS t_ds_task_instance; +CREATE TABLE t_ds_task_instance ( + id int NOT NULL , + name varchar(255) DEFAULT NULL , + task_type varchar(64) DEFAULT NULL , + process_definition_id int DEFAULT NULL , + process_instance_id int DEFAULT NULL , + task_json text , + state int DEFAULT NULL , + submit_time timestamp DEFAULT NULL , + start_time timestamp DEFAULT NULL , + end_time timestamp DEFAULT NULL , + host varchar(45) DEFAULT NULL , + execute_path varchar(200) DEFAULT NULL , + log_path varchar(200) DEFAULT NULL , + alert_flag int DEFAULT NULL , + retry_times int DEFAULT '0' , + pid int DEFAULT NULL , + app_link varchar(255) DEFAULT NULL , + flag int DEFAULT '1' , + retry_interval int DEFAULT NULL , + max_retry_times int DEFAULT NULL , + task_instance_priority int DEFAULT NULL , + worker_group_id int DEFAULT '-1' , + PRIMARY KEY (id) +) ; + +-- +-- Table structure for table t_ds_tenant +-- + +DROP TABLE IF EXISTS t_ds_tenant; +CREATE TABLE t_ds_tenant ( + id int NOT NULL , + tenant_code varchar(64) DEFAULT NULL , + tenant_name varchar(64) DEFAULT NULL , + description varchar(256) DEFAULT NULL , + queue_id int DEFAULT NULL , + create_time timestamp DEFAULT NULL , + update_time timestamp DEFAULT NULL , + PRIMARY KEY (id) +) ; + +-- +-- Table structure for table t_ds_udfs +-- + +DROP TABLE IF EXISTS t_ds_udfs; +CREATE TABLE t_ds_udfs ( + id int NOT NULL , + user_id int NOT NULL , + func_name varchar(100) NOT NULL , + class_name varchar(255) NOT NULL , + type int NOT NULL , + arg_types varchar(255) DEFAULT NULL , + database varchar(255) DEFAULT NULL , + description varchar(255) DEFAULT NULL , + resource_id int NOT NULL , + resource_name varchar(255) NOT NULL , + create_time timestamp NOT NULL , + update_time timestamp NOT NULL , + PRIMARY KEY (id) +) ; + +-- +-- Table structure for table t_ds_user +-- + +DROP TABLE IF EXISTS t_ds_user; +CREATE TABLE t_ds_user ( + id int NOT NULL , + user_name varchar(64) DEFAULT NULL , + user_password varchar(64) DEFAULT NULL , + user_type int DEFAULT NULL , + email varchar(64) DEFAULT NULL , + phone varchar(11) DEFAULT NULL , + tenant_id int DEFAULT NULL , + create_time timestamp DEFAULT NULL , + update_time timestamp DEFAULT NULL , + queue varchar(64) DEFAULT NULL , + PRIMARY KEY (id) +); + +-- +-- Table structure for table t_ds_version +-- + +DROP TABLE IF EXISTS t_ds_version; +CREATE TABLE t_ds_version ( + id int NOT NULL , + version varchar(200) NOT NULL, + PRIMARY KEY (id) +) ; +create index version_index on t_ds_version(version); + +-- +-- Table structure for table t_ds_worker_group +-- + +DROP TABLE IF EXISTS t_ds_worker_group; +CREATE TABLE t_ds_worker_group ( + id bigint NOT NULL , + name varchar(256) DEFAULT NULL , + ip_list varchar(256) DEFAULT NULL , + create_time timestamp DEFAULT NULL , + update_time timestamp DEFAULT NULL , + PRIMARY KEY (id) +) ; + +-- +-- Table structure for table t_ds_worker_server +-- + +DROP TABLE IF EXISTS t_ds_worker_server; +CREATE TABLE t_ds_worker_server ( + id int NOT NULL , + host varchar(45) DEFAULT NULL , + port int DEFAULT NULL , + zk_directory varchar(64) DEFAULT NULL , + res_info varchar(255) DEFAULT NULL , + create_time timestamp DEFAULT NULL , + last_heartbeat_time timestamp DEFAULT NULL , + PRIMARY KEY (id) +) ; + + +DROP SEQUENCE IF EXISTS t_ds_access_token_id_sequence; +CREATE SEQUENCE t_ds_access_token_id_sequence; +ALTER TABLE t_ds_access_token ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_access_token_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_alert_id_sequence; +CREATE SEQUENCE t_ds_alert_id_sequence; +ALTER TABLE t_ds_alert ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_alert_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_alertgroup_id_sequence; +CREATE SEQUENCE t_ds_alertgroup_id_sequence; +ALTER TABLE t_ds_alertgroup ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_alertgroup_id_sequence'); + +DROP SEQUENCE IF EXISTS t_ds_command_id_sequence; +CREATE SEQUENCE t_ds_command_id_sequence; +ALTER TABLE t_ds_command ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_command_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_datasource_id_sequence; +CREATE SEQUENCE t_ds_datasource_id_sequence; +ALTER TABLE t_ds_datasource ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_datasource_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_master_server_id_sequence; +CREATE SEQUENCE t_ds_master_server_id_sequence; +ALTER TABLE t_ds_master_server ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_master_server_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_process_definition_id_sequence; +CREATE SEQUENCE t_ds_process_definition_id_sequence; +ALTER TABLE t_ds_process_definition ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_process_definition_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_process_instance_id_sequence; +CREATE SEQUENCE t_ds_process_instance_id_sequence; +ALTER TABLE t_ds_process_instance ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_process_instance_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_project_id_sequence; +CREATE SEQUENCE t_ds_project_id_sequence; +ALTER TABLE t_ds_project ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_project_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_queue_id_sequence; +CREATE SEQUENCE t_ds_queue_id_sequence; +ALTER TABLE t_ds_queue ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_queue_id_sequence'); + +DROP SEQUENCE IF EXISTS t_ds_relation_datasource_user_id_sequence; +CREATE SEQUENCE t_ds_relation_datasource_user_id_sequence; +ALTER TABLE t_ds_relation_datasource_user ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_relation_datasource_user_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_relation_process_instance_id_sequence; +CREATE SEQUENCE t_ds_relation_process_instance_id_sequence; +ALTER TABLE t_ds_relation_process_instance ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_relation_process_instance_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_relation_project_user_id_sequence; +CREATE SEQUENCE t_ds_relation_project_user_id_sequence; +ALTER TABLE t_ds_relation_project_user ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_relation_project_user_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_relation_resources_user_id_sequence; +CREATE SEQUENCE t_ds_relation_resources_user_id_sequence; +ALTER TABLE t_ds_relation_resources_user ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_relation_resources_user_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_relation_udfs_user_id_sequence; +CREATE SEQUENCE t_ds_relation_udfs_user_id_sequence; +ALTER TABLE t_ds_relation_udfs_user ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_relation_udfs_user_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_relation_user_alertgroup_id_sequence; +CREATE SEQUENCE t_ds_relation_user_alertgroup_id_sequence; +ALTER TABLE t_ds_relation_user_alertgroup ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_relation_user_alertgroup_id_sequence'); + +DROP SEQUENCE IF EXISTS t_ds_resources_id_sequence; +CREATE SEQUENCE t_ds_resources_id_sequence; +ALTER TABLE t_ds_resources ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_resources_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_schedules_id_sequence; +CREATE SEQUENCE t_ds_schedules_id_sequence; +ALTER TABLE t_ds_schedules ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_schedules_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_task_instance_id_sequence; +CREATE SEQUENCE t_ds_task_instance_id_sequence; +ALTER TABLE t_ds_task_instance ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_task_instance_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_tenant_id_sequence; +CREATE SEQUENCE t_ds_tenant_id_sequence; +ALTER TABLE t_ds_tenant ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_tenant_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_udfs_id_sequence; +CREATE SEQUENCE t_ds_udfs_id_sequence; +ALTER TABLE t_ds_udfs ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_udfs_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_user_id_sequence; +CREATE SEQUENCE t_ds_user_id_sequence; +ALTER TABLE t_ds_user ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_user_id_sequence'); + +DROP SEQUENCE IF EXISTS t_ds_version_id_sequence; +CREATE SEQUENCE t_ds_version_id_sequence; +ALTER TABLE t_ds_version ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_version_id_sequence'); + +DROP SEQUENCE IF EXISTS t_ds_worker_group_id_sequence; +CREATE SEQUENCE t_ds_worker_group_id_sequence; +ALTER TABLE t_ds_worker_group ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_group_id_sequence'); +DROP SEQUENCE IF EXISTS t_ds_worker_server_id_sequence; +CREATE SEQUENCE t_ds_worker_server_id_sequence; +ALTER TABLE t_ds_worker_server ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_server_id_sequence'); + + +-- Records of t_ds_user,user : admin , password : dolphinscheduler123 +INSERT INTO t_ds_user(user_name,user_password,user_type,email,phone,tenant_id,create_time,update_time) VALUES ('admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22'); + +-- Records of t_ds_alertgroup,dolphinscheduler warning group +INSERT INTO t_ds_alertgroup(group_name,group_type,description,create_time,update_time) VALUES ('dolphinscheduler warning group', '0', 'dolphinscheduler warning group','2018-11-29 10:20:39', '2018-11-29 10:20:39'); +INSERT INTO t_ds_relation_user_alertgroup(alertgroup_id,user_id,create_time,update_time) VALUES ( '1', '1', '2018-11-29 10:22:33', '2018-11-29 10:22:33'); + +-- Records of t_ds_queue,default queue name : default +INSERT INTO t_ds_queue(queue_name,queue,create_time,update_time) VALUES ('default', 'default','2018-11-29 10:22:33', '2018-11-29 10:22:33'); + +-- Records of t_ds_queue,default queue name : default +INSERT INTO t_ds_version(version) VALUES ('1.2.0'); \ No newline at end of file diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java index 2aee3d4ef6..c051450841 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java @@ -20,6 +20,8 @@ import org.apache.poi.hssf.usermodel.HSSFCell; import org.apache.poi.hssf.usermodel.HSSFRow; import org.apache.poi.hssf.usermodel.HSSFSheet; import org.apache.poi.hssf.usermodel.HSSFWorkbook; +import org.apache.poi.ss.usermodel.CellStyle; +import org.apache.poi.ss.usermodel.HorizontalAlignment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,10 +73,14 @@ public class ExcelUtils { //set the height of the first line row.setHeight((short)500); + //set Horizontal right + CellStyle cellStyle = wb.createCellStyle(); + cellStyle.setAlignment(HorizontalAlignment.RIGHT); //setting excel headers for (int i = 0; i < headerList.size(); i++) { HSSFCell cell = row.createCell(i); + cell.setCellStyle(cellStyle); cell.setCellValue(headerList.get(i)); } @@ -88,6 +94,7 @@ public class ExcelUtils { rowIndex++; for (int j = 0 ; j < values.length ; j++){ HSSFCell cell1 = row.createCell(j); + cell1.setCellStyle(cellStyle); cell1.setCellValue(String.valueOf(values[j])); } } diff --git a/dolphinscheduler-alert/src/main/resources/mail_templates/alert_mail_template.ftl b/dolphinscheduler-alert/src/main/resources/mail_templates/alert_mail_template.ftl index c638609090..1ca9cab17e 100644 --- a/dolphinscheduler-alert/src/main/resources/mail_templates/alert_mail_template.ftl +++ b/dolphinscheduler-alert/src/main/resources/mail_templates/alert_mail_template.ftl @@ -14,4 +14,4 @@ ~ See the License for the specific language governing permissions and ~ limitations under the License. --> - dolphinscheduler<#if title??> ${title}<#if content??> ${content}
\ No newline at end of file + dolphinscheduler<#if title??> ${title}<#if content??> ${content}
\ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 339cb6c548..6073f218a4 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -1005,4 +1005,10 @@ public final class Constants { public static final String CLASS = "class"; public static final String RECEIVERS = "receivers"; public static final String RECEIVERS_CC = "receiversCc"; + + + /** + * dataSource sensitive param + */ + public static final String DATASOURCE_PASSWORD_REGEX = "(?<=(\"password\":\")).*?(?=(\"))"; } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java new file mode 100644 index 0000000000..8bb64b03c8 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils; + +import com.alibaba.fastjson.JSONObject; +import org.apache.commons.lang.time.DateUtils; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.DataType; +import org.apache.dolphinscheduler.common.enums.Direct; +import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.utils.placeholder.PlaceholderUtils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.*; +import static org.apache.dolphinscheduler.common.Constants.PARAMETER_FORMAT_TIME; +import static org.apache.dolphinscheduler.common.utils.placeholder.TimePlaceholderUtils.replacePlaceholders; + + +public class ParameterUtilsTest { + public static final Logger logger = LoggerFactory.getLogger(ParameterUtilsTest.class); + + /** + * Test convertParameterPlaceholders + */ + @Test + public void testConvertParameterPlaceholders() throws Exception { + // parameterString,parameterMap is null + Assert.assertNull(ParameterUtils.convertParameterPlaceholders(null, null)); + + // parameterString is null,parameterMap is not null + Map parameterMap = new HashMap(); + parameterMap.put("testParameter","testParameter"); + Assert.assertNull(ParameterUtils.convertParameterPlaceholders(null, parameterMap)); + + // parameterString、parameterMap is not null + String parameterString = "test_parameter"; + Assert.assertEquals(parameterString, ParameterUtils.convertParameterPlaceholders(parameterString, parameterMap)); + + //replace variable ${} form + parameterMap.put("testParameter2","${testParameter}"); + Assert.assertEquals(parameterString,PlaceholderUtils.replacePlaceholders(parameterString, parameterMap, true)); + + // replace time $[...] form, eg. $[yyyyMMdd] + Date cronTime = new Date(); + Assert.assertEquals(parameterString, replacePlaceholders(parameterString, cronTime, true)); + + // replace time $[...] form, eg. $[yyyyMMdd] + Date cronTimeStr = DateUtils.parseDate("20191220145900", new String[]{PARAMETER_FORMAT_TIME}); + Assert.assertEquals(parameterString, replacePlaceholders(parameterString, cronTimeStr, true)); + } + + /** + * Test curingGlobalParams + */ + @Test + public void testCuringGlobalParams() throws Exception { + //define globalMap + Map globalParamMap = new HashMap<>(); + globalParamMap.put("globalParams1","Params1"); + + //define globalParamList + List globalParamList = new ArrayList<>(); + + //define scheduleTime + Date scheduleTime = DateUtils.parseDate("20191220145900", new String[]{PARAMETER_FORMAT_TIME}); + + //test globalParamList is null + String result = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime); + Assert.assertNull(result); + Assert.assertNull(ParameterUtils.curingGlobalParams(null,null,CommandType.START_CURRENT_TASK_PROCESS,null)); + Assert.assertNull(ParameterUtils.curingGlobalParams(globalParamMap,null,CommandType.START_CURRENT_TASK_PROCESS,scheduleTime)); + + //test globalParamList is not null + Property property=new Property("testGlobalParam", Direct.IN, DataType.VARCHAR,"testGlobalParam"); + globalParamList.add(property); + + String result2 = ParameterUtils.curingGlobalParams(null,globalParamList,CommandType.START_CURRENT_TASK_PROCESS,scheduleTime); + Assert.assertEquals(result2, JSONObject.toJSONString(globalParamList)); + + String result3 = ParameterUtils.curingGlobalParams(globalParamMap,globalParamList,CommandType.START_CURRENT_TASK_PROCESS,null); + Assert.assertEquals(result3, JSONObject.toJSONString(globalParamList)); + + String result4 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime); + Assert.assertEquals(result4, JSONObject.toJSONString(globalParamList)); + + //test var $ startsWith + globalParamMap.put("bizDate","${system.biz.date}"); + globalParamMap.put("b1zCurdate","${system.biz.curdate}"); + + + Property property2=new Property("testParamList1", Direct.IN, DataType.VARCHAR,"testParamList"); + Property property3=new Property("testParamList2", Direct.IN, DataType.VARCHAR,"{testParamList1}"); + Property property4=new Property("testParamList3", Direct.IN, DataType.VARCHAR,"${b1zCurdate}"); + + globalParamList.add(property2); + globalParamList.add(property3); + globalParamList.add(property4); + + String result5 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime); + Assert.assertEquals(result5,JSONUtils.toJsonString(globalParamList)); + } + + /** + * Test handleEscapes + */ + @Test + public void testHandleEscapes() throws Exception { + Assert.assertNull(ParameterUtils.handleEscapes(null)); + Assert.assertEquals("",ParameterUtils.handleEscapes("")); + Assert.assertEquals("test Parameter",ParameterUtils.handleEscapes("test Parameter")); + Assert.assertEquals("////%test////%Parameter",ParameterUtils.handleEscapes("%test%Parameter")); + } + +} diff --git a/dolphinscheduler-dao/src/main/resources/application-dao.properties b/dolphinscheduler-dao/src/main/resources/application-dao.properties index 926d8dbb2f..07abe37835 100644 --- a/dolphinscheduler-dao/src/main/resources/application-dao.properties +++ b/dolphinscheduler-dao/src/main/resources/application-dao.properties @@ -19,12 +19,12 @@ spring.datasource.type=com.alibaba.druid.pool.DruidDataSource # postgre spring.datasource.driver-class-name=org.postgresql.Driver -spring.datasource.url=jdbc:postgresql://192.168.xx.xx:5432/dolphinscheduler +spring.datasource.url=jdbc:postgresql://localhost:5432/dolphinscheduler # mysql #spring.datasource.driver-class-name=com.mysql.jdbc.Driver #spring.datasource.url=jdbc:mysql://192.168.xx.xx:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8 -spring.datasource.username=xx -spring.datasource.password=xx +spring.datasource.username=test +spring.datasource.password=test # connection configuration spring.datasource.initialSize=5 diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java index 2eb48cd8d2..5ee5d01b7a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java @@ -72,7 +72,6 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread { this.taskInstance.setState(ExecutionStatus.KILL); }else{ this.taskInstance.setState(subProcessInstance.getState()); - result = true; } } taskInstance.setEndTime(new Date()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/AbstractMonitor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/AbstractMonitor.java new file mode 100644 index 0000000000..ab30ce890f --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/AbstractMonitor.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.monitor; + +import org.apache.commons.lang3.StringUtils; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * abstract server monitor and auto restart server + */ +@Component +public abstract class AbstractMonitor implements Monitor { + + private static final Logger logger = LoggerFactory.getLogger(AbstractMonitor.class); + + + @Autowired + private RunConfig runConfig; + + /** + * monitor server and restart + */ + @Override + public void monitor(String masterPath,String workerPath,Integer port,String installPath) { + try { + restartServer(masterPath,port,installPath); + restartServer(workerPath,port,installPath); + }catch (Exception e){ + logger.error("server start up error",e); + } + } + + private void restartServer(String path,Integer port,String installPath) throws Exception{ + + String type = path.split("/")[2]; + String serverName = null; + String nodes = null; + if ("masters".equals(type)){ + serverName = "master-server"; + nodes = runConfig.getMasters(); + }else if ("workers".equals(type)){ + serverName = "worker-server"; + nodes = runConfig.getWorkers(); + } + + Map activeNodeMap = getActiveNodesByPath(path); + + Set needRestartServer = getNeedRestartServer(getRunConfigServer(nodes), + activeNodeMap.keySet()); + + for (String node : needRestartServer){ + // os.system('ssh -p ' + ssh_port + ' ' + self.get_ip_by_hostname(master) + ' sh ' + install_path + '/bin/dolphinscheduler-daemon.sh start master-server') + String runCmd = "ssh -p " + port + " " + node + " sh " + installPath + "/bin/dolphinscheduler-daemon.sh start " + serverName; + Runtime.getRuntime().exec(runCmd); + } + } + + /** + * get need restart server + * @param deployedNodes deployedNodes + * @param activeNodes activeNodes + * @return need restart server + */ + private Set getNeedRestartServer(Set deployedNodes,Set activeNodes){ + if (CollectionUtils.isEmpty(activeNodes)){ + return deployedNodes; + } + + Set result = new HashSet<>(); + + result.addAll(deployedNodes); + result.removeAll(activeNodes); + + return result; + } + + /** + * run config masters/workers + * @return master set/worker set + */ + private Set getRunConfigServer(String nodes){ + Set nodeSet = new HashSet(); + + + if (StringUtils.isEmpty(nodes)){ + return null; + } + + String[] nodeArr = nodes.split(","); + + for (String node : nodeArr){ + nodeSet.add(node); + } + + return nodeSet; + } + + /** + * get active nodes by path + * @param path path + * @return active nodes + */ + protected abstract Map getActiveNodesByPath(String path); +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/Monitor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/Monitor.java new file mode 100644 index 0000000000..3ee9488a3e --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/Monitor.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.monitor; + +/** + * server monitor and auto restart server + */ +public interface Monitor { + + /** + * monitor server and restart + */ + void monitor(String masterPath, String workerPath, Integer port, String installPath); +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/MonitorServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/MonitorServer.java new file mode 100644 index 0000000000..ac549bc386 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/MonitorServer.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.monitor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.context.annotation.ComponentScan; + +/** + * monitor server + */ +@ComponentScan("org.apache.dolphinscheduler") +public class MonitorServer implements CommandLineRunner { + + private static Integer ARGS_LENGTH = 4; + + private static final Logger logger = LoggerFactory.getLogger(MonitorServer.class); + + /** + * monitor + */ + @Autowired + private Monitor monitor; + + + + public static void main(String[] args) throws Exception{ + + new SpringApplicationBuilder(MonitorServer.class).web(WebApplicationType.NONE).run(args); + } + + @Override + public void run(String... args) throws Exception { + if (args.length != ARGS_LENGTH){ + logger.error("Usage: "); + return; + } + + String masterPath = args[0]; + String workerPath = args[1]; + Integer port = Integer.parseInt(args[2]); + String installPath = args[3]; + monitor.monitor(masterPath,workerPath,port,installPath); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RunConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RunConfig.java new file mode 100644 index 0000000000..419e9027ea --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RunConfig.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.monitor; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.PropertySource; +import org.springframework.stereotype.Component; + +/** + * zookeeper conf + */ +@Component +@PropertySource("classpath:config/run_config.conf") +public class RunConfig { + + //zk connect config + @Value("${masters}") + private String masters; + + @Value("${workers}") + private String workers; + + @Value("${alertServer}") + private String alertServer; + + @Value("${apiServers}") + private String apiServers; + + @Value("${sshPort}") + private String sshPort; + + public String getMasters() { + return masters; + } + + public void setMasters(String masters) { + this.masters = masters; + } + + public String getWorkers() { + return workers; + } + + public void setWorkers(String workers) { + this.workers = workers; + } + + public String getAlertServer() { + return alertServer; + } + + public void setAlertServer(String alertServer) { + this.alertServer = alertServer; + } + + public String getApiServers() { + return apiServers; + } + + public void setApiServers(String apiServers) { + this.apiServers = apiServers; + } + + public String getSshPort() { + return sshPort; + } + + public void setSshPort(String sshPort) { + this.sshPort = sshPort; + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/ZKMonitorImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/ZKMonitorImpl.java new file mode 100644 index 0000000000..927074012d --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/ZKMonitorImpl.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.monitor; + +import org.apache.dolphinscheduler.common.zk.ZookeeperOperator; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * zk monitor server impl + */ +@Component +public class ZKMonitorImpl extends AbstractMonitor { + + /** + * zookeeper operator + */ + @Autowired + private ZookeeperOperator zookeeperOperator; + + + /** + * get active nodes map by path + * @param path path + * @return active nodes map + */ + @Override + protected Map getActiveNodesByPath(String path) { + + Map maps = new HashMap<>(); + + List childrenList = zookeeperOperator.getChildrenKeys(path); + + if (childrenList == null){ + return maps; + } + + for (String child : childrenList){ + maps.put(child.split("_")[0],child); + } + + return maps; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java index b6fcb2b40d..ab34ddfc2b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.utils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ProgramType; +import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.commons.lang.StringUtils; import org.slf4j.LoggerFactory; @@ -44,9 +45,11 @@ public class FlinkArgsUtils { */ public static List buildArgs(FlinkParameters param) { List args = new ArrayList<>(); + String deployMode = "cluster"; - if (StringUtils.isNotEmpty(param.getDeployMode())) { - deployMode = param.getDeployMode(); + String tmpDeployMode = param.getDeployMode(); + if (StringUtils.isNotEmpty(tmpDeployMode)) { + deployMode = tmpDeployMode; } if (!"local".equals(deployMode)) { @@ -54,68 +57,70 @@ public class FlinkArgsUtils { args.add(Constants.FLINK_YARN_CLUSTER); //yarn-cluster - - if (param.getSlot() != 0) { + int slot = param.getSlot(); + if (slot != 0) { args.add(Constants.FLINK_YARN_SLOT); - args.add(String.format("%d", param.getSlot())); //-ys + args.add(String.format("%d", slot)); //-ys } - if (StringUtils.isNotEmpty(param.getAppName())) { //-ynm + String appName = param.getAppName(); + if (StringUtils.isNotEmpty(appName)) { //-ynm args.add(Constants.FLINK_APP_NAME); - args.add(param.getAppName()); + args.add(appName); } - if (param.getTaskManager() != 0) { //-yn + int taskManager = param.getTaskManager(); + if (taskManager != 0) { //-yn args.add(Constants.FLINK_TASK_MANAGE); - args.add(String.format("%d", param.getTaskManager())); + args.add(String.format("%d", taskManager)); } - if (StringUtils.isNotEmpty(param.getJobManagerMemory())) { + String jobManagerMemory = param.getJobManagerMemory(); + if (StringUtils.isNotEmpty(jobManagerMemory)) { args.add(Constants.FLINK_JOB_MANAGE_MEM); - args.add(param.getJobManagerMemory()); //-yjm + args.add(jobManagerMemory); //-yjm } - if (StringUtils.isNotEmpty(param.getTaskManagerMemory())) { // -ytm + String taskManagerMemory = param.getTaskManagerMemory(); + if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm args.add(Constants.FLINK_TASK_MANAGE_MEM); - args.add(param.getTaskManagerMemory()); + args.add(taskManagerMemory); } args.add(Constants.FLINK_detach); //-d - } - if (param.getProgramType() != null) { - if (param.getProgramType() != ProgramType.PYTHON) { - if (StringUtils.isNotEmpty(param.getMainClass())) { - args.add(Constants.FLINK_MAIN_CLASS); //-c - args.add(param.getMainClass()); //main class - } - } + ProgramType programType = param.getProgramType(); + String mainClass = param.getMainClass(); + if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { + args.add(Constants.FLINK_MAIN_CLASS); //-c + args.add(param.getMainClass()); //main class } - if (param.getMainJar() != null) { - args.add(param.getMainJar().getRes()); + ResourceInfo mainJar = param.getMainJar(); + if (mainJar != null) { + args.add(mainJar.getRes()); } - if (StringUtils.isNotEmpty(param.getMainArgs())) { - args.add(param.getMainArgs()); + String mainArgs = param.getMainArgs(); + if (StringUtils.isNotEmpty(mainArgs)) { + args.add(mainArgs); } // --files --conf --libjar ... - if (StringUtils.isNotEmpty(param.getOthers())) { - String others = param.getOthers(); - if (!others.contains("--qu")) { - if (StringUtils.isNotEmpty(param.getQueue()) && !deployMode.equals("local")) { - args.add(Constants.FLINK_QUEUE); - args.add(param.getQueue()); - } + String others = param.getOthers(); + String queue = param.getQueue(); + if (StringUtils.isNotEmpty(others)) { + + if (!others.contains(Constants.FLINK_QUEUE) && StringUtils.isNotEmpty(queue) && !deployMode.equals("local")) { + args.add(Constants.FLINK_QUEUE); + args.add(param.getQueue()); } - args.add(param.getOthers()); - } else if (StringUtils.isNotEmpty(param.getQueue()) && !deployMode.equals("local")) { + args.add(others); + } else if (StringUtils.isNotEmpty(queue) && !deployMode.equals("local")) { args.add(Constants.FLINK_QUEUE); args.add(param.getQueue()); - } return args; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/RemoveZKNode.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/RemoveZKNode.java new file mode 100644 index 0000000000..7264c2f59d --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/RemoveZKNode.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.utils; + +import org.apache.dolphinscheduler.common.zk.ZookeeperOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.context.annotation.ComponentScan; + +@ComponentScan("org.apache.dolphinscheduler") +public class RemoveZKNode implements CommandLineRunner { + + private static Integer ARGS_LENGTH = 1; + + private static final Logger logger = LoggerFactory.getLogger(RemoveZKNode.class); + + + /** + * zookeeper operator + */ + @Autowired + private ZookeeperOperator zookeeperOperator; + + public static void main(String[] args) { + + new SpringApplicationBuilder(RemoveZKNode.class).web(WebApplicationType.NONE).run(args); + } + + @Override + public void run(String... args) throws Exception { + + if (args.length != ARGS_LENGTH){ + logger.error("Usage: "); + return; + } + + zookeeperOperator.remove(args[0]); + zookeeperOperator.close(); + + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtil.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtil.java new file mode 100644 index 0000000000..948e92cb24 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtil.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.utils; + +import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.Constants; + +/** + * sensitive log Util + */ +public class SensitiveLogUtil { + + /** + * @param dataSourcePwd data source password + * @return String + */ + public static String maskDataSourcePwd(String dataSourcePwd){ + + if (StringUtils.isNotEmpty(dataSourcePwd)) { + dataSourcePwd = Constants.PASSWORD_DEFAULT; + } + return dataSourcePwd; + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverter.java new file mode 100644 index 0000000000..be8d3d12a0 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverter.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.log; + + +import ch.qos.logback.classic.pattern.MessageConverter; +import ch.qos.logback.classic.spi.ILoggingEvent; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.server.utils.SensitiveLogUtil; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * sensitive data log converter + */ +@Slf4j +public class SensitiveDataConverter extends MessageConverter { + + /** + * password pattern + */ + private final Pattern pwdPattern = Pattern.compile(Constants.DATASOURCE_PASSWORD_REGEX); + + + @Override + public String convert(ILoggingEvent event) { + + // get original log + String requestLogMsg = event.getFormattedMessage(); + + // desensitization log + return convertMsg(requestLogMsg); + } + + /** + * deal with sensitive log + * + * @param oriLogMsg original log + */ + private String convertMsg(final String oriLogMsg) { + + String tempLogMsg = oriLogMsg; + + if (StringUtils.isNotEmpty(tempLogMsg)) { + tempLogMsg = passwordHandler(pwdPattern, tempLogMsg); + } + return tempLogMsg; + } + + /** + * password regex + * + * @param logMsg original log + */ + private String passwordHandler(Pattern pwdPattern, String logMsg) { + + Matcher matcher = pwdPattern.matcher(logMsg); + + StringBuffer sb = new StringBuffer(logMsg.length()); + + while (matcher.find()) { + + String password = matcher.group(); + + String maskPassword = SensitiveLogUtil.maskDataSourcePwd(password); + + matcher.appendReplacement(sb, maskPassword); + } + matcher.appendTail(sb); + + return sb.toString(); + } + + +} diff --git a/script/config/install_config.conf b/dolphinscheduler-server/src/main/resources/config/install_config.conf similarity index 100% rename from script/config/install_config.conf rename to dolphinscheduler-server/src/main/resources/config/install_config.conf diff --git a/script/config/run_config.conf b/dolphinscheduler-server/src/main/resources/config/run_config.conf similarity index 100% rename from script/config/run_config.conf rename to dolphinscheduler-server/src/main/resources/config/run_config.conf diff --git a/dolphinscheduler-server/src/main/resources/worker_logback.xml b/dolphinscheduler-server/src/main/resources/worker_logback.xml index 64d85d4565..7ba0c9b8ab 100644 --- a/dolphinscheduler-server/src/main/resources/worker_logback.xml +++ b/dolphinscheduler-server/src/main/resources/worker_logback.xml @@ -18,6 +18,8 @@ + @@ -31,7 +33,7 @@ INFO - + taskAppId ${log.base} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java new file mode 100644 index 0000000000..710d2c2505 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.utils; + +import org.apache.dolphinscheduler.common.enums.ProgramType; +import org.apache.dolphinscheduler.common.process.ResourceInfo; +import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +/** + * Test FlinkArgsUtils + */ +public class FlinkArgsUtilsTest { + + private static final Logger logger = LoggerFactory.getLogger(FlinkArgsUtilsTest.class); + + public String mode = "cluster"; + public int slot = 2; + public String appName = "testFlink"; + public int taskManager = 4; + public String taskManagerMemory = "2G"; + public String jobManagerMemory = "4G"; + public ProgramType programType = ProgramType.JAVA; + public String mainClass = "com.test"; + public ResourceInfo mainJar = null; + public String mainArgs = "testArgs"; + public String queue = "queue1"; + public String others = "--input file:///home"; + + + @Before + public void setUp() throws Exception { + + ResourceInfo main = new ResourceInfo(); + main.setRes("testflink-1.0.0-SNAPSHOT.jar"); + mainJar = main; + } + + /** + * Test buildArgs + */ + @Test + public void testBuildArgs() { + + //Define params + FlinkParameters param = new FlinkParameters(); + param.setDeployMode(mode); + param.setMainClass(mainClass); + param.setAppName(appName); + param.setSlot(slot); + param.setTaskManager(taskManager); + param.setJobManagerMemory(jobManagerMemory); + param.setTaskManagerMemory(taskManagerMemory); + param.setMainJar(mainJar); + param.setProgramType(programType); + param.setMainArgs(mainArgs); + param.setQueue(queue); + param.setOthers(others); + + //Invoke buildArgs + List result = FlinkArgsUtils.buildArgs(param); + for (String s : result) { + logger.info(s); + } + + //Expected values and order + assertEquals(result.size(),20); + + assertEquals(result.get(0),"-m"); + assertEquals(result.get(1),"yarn-cluster"); + + assertEquals(result.get(2),"-ys"); + assertSame(Integer.valueOf(result.get(3)),slot); + + assertEquals(result.get(4),"-ynm"); + assertEquals(result.get(5),appName); + + assertEquals(result.get(6),"-yn"); + assertSame(Integer.valueOf(result.get(7)),taskManager); + + assertEquals(result.get(8),"-yjm"); + assertEquals(result.get(9),jobManagerMemory); + + assertEquals(result.get(10),"-ytm"); + assertEquals(result.get(11),taskManagerMemory); + + assertEquals(result.get(12),"-d"); + + assertEquals(result.get(13),"-c"); + assertEquals(result.get(14),mainClass); + + assertEquals(result.get(15),mainJar.getRes()); + assertEquals(result.get(16),mainArgs); + + assertEquals(result.get(17),"--qu"); + assertEquals(result.get(18),queue); + + assertEquals(result.get(19),others); + + //Others param without --qu + FlinkParameters param1 = new FlinkParameters(); + param1.setQueue(queue); + param1.setDeployMode(mode); + result = FlinkArgsUtils.buildArgs(param1); + assertEquals(result.size(),5); + + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtilTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtilTest.java new file mode 100644 index 0000000000..2e5bfcf3e5 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtilTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.utils; + + +import org.apache.dolphinscheduler.common.Constants; +import org.junit.Assert; +import org.junit.Test; + + +public class SensitiveLogUtilTest { + + @Test + public void testMaskDataSourcePwd() { + + String password = "123456"; + String emptyPassword = ""; + + Assert.assertEquals(Constants.PASSWORD_DEFAULT, SensitiveLogUtil.maskDataSourcePwd(password)); + Assert.assertEquals("", SensitiveLogUtil.maskDataSourcePwd(emptyPassword)); + + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverterTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverterTest.java new file mode 100644 index 0000000000..fb564a22fb --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverterTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.log; + + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.server.utils.SensitiveLogUtil; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class SensitiveDataConverterTest { + + private final Logger logger = LoggerFactory.getLogger(SensitiveDataConverterTest.class); + + /** + * password pattern + */ + private final Pattern pwdPattern = Pattern.compile(Constants.DATASOURCE_PASSWORD_REGEX); + + + /** + * mask sensitive logMsg - sql task datasource password + */ + @Test + public void testPwdLogMsgConverter() { + + String logMsg = "{\"address\":\"jdbc:mysql://192.168.xx.xx:3306\"," + + "\"database\":\"carbond\"," + + "\"jdbcUrl\":\"jdbc:mysql://192.168.xx.xx:3306/ods\"," + + "\"user\":\"view\"," + + "\"password\":\"view1\"}"; + + String maskLogMsg = "{\"address\":\"jdbc:mysql://192.168.xx.xx:3306\"," + + "\"database\":\"carbond\"," + + "\"jdbcUrl\":\"jdbc:mysql://192.168.xx.xx:3306/ods\"," + + "\"user\":\"view\"," + + "\"password\":\"******\"}"; + + + logger.info("parameter : {}", logMsg); + logger.info("parameter : {}", passwordHandler(pwdPattern, logMsg)); + + Assert.assertNotEquals(logMsg, passwordHandler(pwdPattern, logMsg)); + Assert.assertEquals(maskLogMsg, passwordHandler(pwdPattern, logMsg)); + + } + + /** + * password regex test + * + * @param logMsg original log + */ + private static String passwordHandler(Pattern pattern, String logMsg) { + + Matcher matcher = pattern.matcher(logMsg); + + StringBuffer sb = new StringBuffer(logMsg.length()); + + while (matcher.find()) { + + String password = matcher.group(); + + String maskPassword = SensitiveLogUtil.maskDataSourcePwd(password); + + matcher.appendReplacement(sb, maskPassword); + } + matcher.appendTail(sb); + + return sb.toString(); + } + + +} diff --git a/install.sh b/install.sh index 3699974e00..b73ca09079 100644 --- a/install.sh +++ b/install.sh @@ -469,8 +469,8 @@ sh ${workDir}/script/stop-all.sh # 4,delete zk node echo "4,delete zk node" -sleep 1 -python ${workDir}/script/del-zk-node.py $zkQuorum $zkRoot + +sh ${workDir}/script/remove-zk-node.sh $zkRoot # 5,scp resources echo "5,scp resources" @@ -485,29 +485,4 @@ fi # 6,startup echo "6,startup" -sh ${workDir}/script/start-all.sh - -# 7,start monitoring self-starting script -monitor_pid=${workDir}/monitor_server.pid -if [ "true" = $monitorServerState ];then - if [ -f $monitor_pid ]; then - TARGET_PID=`cat $monitor_pid` - if kill -0 $TARGET_PID > /dev/null 2>&1; then - echo "monitor server running as process ${TARGET_PID}.Stopping" - kill $TARGET_PID - sleep 5 - if kill -0 $TARGET_PID > /dev/null 2>&1; then - echo "monitor server did not stop gracefully after 5 seconds: killing with kill -9" - kill -9 $TARGET_PID - fi - else - echo "no monitor server to stop" - fi - echo "monitor server running as process ${TARGET_PID}.Stopped success" - rm -f $monitor_pid - fi - nohup python -u ${workDir}/script/monitor-server.py $installPath $zkQuorum $zkMasters $zkWorkers > ${workDir}/monitor-server.log 2>&1 & - echo $! > $monitor_pid - echo "start monitor server success as process `cat $monitor_pid`" - -fi \ No newline at end of file +sh ${workDir}/script/start-all.sh \ No newline at end of file diff --git a/pom.xml b/pom.xml index e0f4e9303d..83a6b45e10 100644 --- a/pom.xml +++ b/pom.xml @@ -655,6 +655,8 @@ **/alert/utils/JSONUtilsTest.java **/alert/utils/PropertyUtilsTest.java **/server/utils/SparkArgsUtilsTest.java + **/server/utils/FlinkArgsUtilsTest.java + **/dao/mapper/AccessTokenMapperTest.java diff --git a/script/del-zk-node.py b/script/del-zk-node.py deleted file mode 100644 index 57034a5164..0000000000 --- a/script/del-zk-node.py +++ /dev/null @@ -1,34 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import time -import sys -from kazoo.client import KazooClient - -class ZkClient: - def __init__(self): - self.zk = KazooClient(hosts=sys.argv[1]) - self.zk.start() - def del_node(self): - self.zk.delete(sys.argv[2], recursive=True) - print('deleted success') - def __del__(self): - self.zk.stop() -if __name__ == '__main__': - zkclient = ZkClient() - zkclient.del_node() - time.sleep(2) diff --git a/script/monitor-server.py b/script/monitor-server.py deleted file mode 100644 index 26fcd87de6..0000000000 --- a/script/monitor-server.py +++ /dev/null @@ -1,124 +0,0 @@ -#!/usr/bin/env python -# -*- coding:utf-8 -*- -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -''' -1, yum install pip -yum -y install python-pip - -2, pip install kazoo -pip install kazoo - -or - -3, conda install kazoo -conda install -c conda-forge kazoo - -run script and parameter description: -nohup python -u monitor_server.py /data1_1T/dolphinscheduler 192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181 /dolphinscheduler/masters /dolphinscheduler/workers> monitor_server.log 2>&1 & -the parameters are as follows: -/data1_1T/dolphinscheduler : the value comes from the installPath in install.sh -192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181 : the value comes from zkQuorum in install.sh -the value comes from zkWorkers in install.sh -/dolphinscheduler/masters : the value comes from zkMasters in install.sh -/dolphinscheduler/workers : the value comes from zkWorkers in install.sh -''' -import sys -import socket -import os -import sched -import time -from datetime import datetime -from kazoo.client import KazooClient - -schedule = sched.scheduler(time.time, time.sleep) - -class ZkClient: - def __init__(self): - # hosts configuration zk address cluster - self.zk = KazooClient(hosts=zookeepers) - self.zk.start() - - # read configuration files and assemble them into a dictionary - def read_file(self,path): - with open(path, 'r') as f: - dict = {} - for line in f.readlines(): - arr = line.strip().split('=') - if (len(arr) == 2): - dict[arr[0]] = arr[1] - return dict - - # get the ip address according to hostname - def get_ip_by_hostname(self,hostname): - return socket.gethostbyname(hostname) - - # restart server - def restart_server(self,inc): - config_dict = self.read_file(install_path + '/conf/config/run_config.conf') - - master_list = config_dict.get('masters').split(',') - print master_list - master_list = list(map(lambda item : self.get_ip_by_hostname(item),master_list)) - - worker_list = config_dict.get('workers').split(',') - print worker_list - worker_list = list(map(lambda item: self.get_ip_by_hostname(item), worker_list)) - - ssh_port = config_dict.get("sshPort") - print ssh_port - - if (self.zk.exists(masters_zk_path)): - zk_master_list = [] - zk_master_nodes = self.zk.get_children(masters_zk_path) - for zk_master_node in zk_master_nodes: - zk_master_list.append(zk_master_node.split('_')[0]) - restart_master_list = list(set(master_list) - set(zk_master_list)) - if (len(restart_master_list) != 0): - for master in restart_master_list: - print("master " + self.get_ip_by_hostname(master) + " server has down") - os.system('ssh -p ' + ssh_port + ' ' + self.get_ip_by_hostname(master) + ' sh ' + install_path + '/bin/dolphinscheduler-daemon.sh start master-server') - - if (self.zk.exists(workers_zk_path)): - zk_worker_list = [] - zk_worker_nodes = self.zk.get_children(workers_zk_path) - for zk_worker_node in zk_worker_nodes: - zk_worker_list.append(zk_worker_node.split('_')[0]) - restart_worker_list = list(set(worker_list) - set(zk_worker_list)) - if (len(restart_worker_list) != 0): - for worker in restart_worker_list: - print("worker " + self.get_ip_by_hostname(worker) + " server has down") - os.system('ssh -p ' + ssh_port + ' ' + self.get_ip_by_hostname(worker) + ' sh ' + install_path + '/bin/dolphinscheduler-daemon.sh start worker-server') - - print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) - schedule.enter(inc, 0, self.restart_server, (inc,)) - # default parameter 60s - def main(self,inc=60): - # the enter four parameters are: interval event, priority (sequence for simultaneous execution of two events arriving at the same time), function triggered by the call, - # the argument to the trigger function (tuple form) - schedule.enter(0, 0, self.restart_server, (inc,)) - schedule.run() -if __name__ == '__main__': - if (len(sys.argv) < 4): - print('please input install_path,zookeepers,masters_zk_path and worker_zk_path') - install_path = sys.argv[1] - zookeepers = sys.argv[2] - masters_zk_path = sys.argv[3] - workers_zk_path = sys.argv[4] - zkClient = ZkClient() - zkClient.main(300) \ No newline at end of file diff --git a/script/monitor-server.sh b/script/monitor-server.sh new file mode 100644 index 0000000000..05d46048aa --- /dev/null +++ b/script/monitor-server.sh @@ -0,0 +1,52 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +print_usage(){ + printf $"USAGE:$0 masterPath workerPath port installPath\n" + exit 1 +} + +if [ $# -ne 4 ];then + print_usage +fi + +masterPath=$1 +workerPath=$2 +port=$3 +installPath=$4 + + +BIN_DIR=`dirname $0` +BIN_DIR=`cd "$BIN_DIR"; pwd` +DOLPHINSCHEDULER_HOME=$BIN_DIR/.. + +export JAVA_HOME=$JAVA_HOME + + +export DOLPHINSCHEDULER_CONF_DIR=$DOLPHINSCHEDULER_HOME/conf +export DOLPHINSCHEDULER_LIB_JARS=$DOLPHINSCHEDULER_HOME/lib/* + +export DOLPHINSCHEDULER_OPTS="-server -Xmx1g -Xms1g -Xss512k -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70" +export STOP_TIMEOUT=5 + +CLASS=org.apache.dolphinscheduler.server.monitor.MonitorServer + +exec_command="$DOLPHINSCHEDULER_OPTS -classpath $DOLPHINSCHEDULER_CONF_DIR:$DOLPHINSCHEDULER_LIB_JARS $CLASS $masterPath $workerPath $port $installPath" + +cd $DOLPHINSCHEDULER_HOME +$JAVA_HOME/bin/java $exec_command diff --git a/script/remove-zk-node.sh b/script/remove-zk-node.sh new file mode 100644 index 0000000000..1fff1d5597 --- /dev/null +++ b/script/remove-zk-node.sh @@ -0,0 +1,48 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +print_usage(){ + printf $"USAGE:$0 rootNode\n" + exit 1 +} + +if [ $# -ne 1 ];then + print_usage +fi + +rootNode=$1 + +BIN_DIR=`dirname $0` +BIN_DIR=`cd "$BIN_DIR"; pwd` +DOLPHINSCHEDULER_HOME=$BIN_DIR/.. + +export JAVA_HOME=$JAVA_HOME + + +export DOLPHINSCHEDULER_CONF_DIR=$DOLPHINSCHEDULER_HOME/conf +export DOLPHINSCHEDULER_LIB_JARS=$DOLPHINSCHEDULER_HOME/lib/* + +export DOLPHINSCHEDULER_OPTS="-server -Xmx1g -Xms1g -Xss512k -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70" +export STOP_TIMEOUT=5 + +CLASS=org.apache.dolphinscheduler.server.utils.RemoveZKNode + +exec_command="$DOLPHINSCHEDULER_OPTS -classpath $DOLPHINSCHEDULER_CONF_DIR:$DOLPHINSCHEDULER_LIB_JARS $CLASS $rootNode" + +cd $DOLPHINSCHEDULER_HOME +$JAVA_HOME/bin/java $exec_command