Browse Source

Polish config, expose rpc config in application.yml (#14501)

3.2.1-prepare
Wenjun Ruan 12 months ago committed by GitHub
parent
commit
0246327083
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      docs/docs/en/architecture/configuration.md
  2. 10
      docs/docs/zh/architecture/configuration.md
  3. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/AuditPublishService.java
  4. 90
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/ApiConfig.java
  5. 7
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AppConfiguration.java
  6. 38
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AuditConfiguration.java
  7. 38
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/PythonGatewayConfiguration.java
  8. 38
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/TrafficConfiguration.java
  9. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptor.java
  10. 7
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
  11. 76
      dolphinscheduler-api/src/main/resources/application.yaml
  12. 32
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/ApiConfigTest.java
  13. 37
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/AuditConfigurationTest.java
  14. 8
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptorTest.java
  15. 38
      dolphinscheduler-api/src/test/resources/application.yaml
  16. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  17. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  18. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
  19. 25
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcClient.java
  20. 33
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  21. 61
      dolphinscheduler-standalone-server/src/main/resources/application.yaml
  22. 5
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  23. 9
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
  24. 2
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java

10
docs/docs/en/architecture/configuration.md

@ -259,11 +259,11 @@ Location: `api-server/conf/application.yaml`
|security.authentication.ldap.ssl.enable|false|LDAP switch|
|security.authentication.ldap.ssl.trust-store|ldapkeystore.jks|LDAP jks file absolute path|
|security.authentication.ldap.ssl.trust-store-password|password|LDAP jks password|
|traffic.control.global.switch|false|traffic control global switch|
|traffic.control.max-global-qps-rate|300|global max request number per second|
|traffic.control.tenant-switch|false|traffic control tenant switch|
|traffic.control.default-tenant-qps-rate|10|default tenant max request number per second|
|traffic.control.customize-tenant-qps-rate||customize tenant max request number per second|
|api.traffic.control.global.switch|false|traffic control global switch|
|api.traffic.control.max-global-qps-rate|300|global max request number per second|
|api.traffic.control.tenant-switch|false|traffic control tenant switch|
|api.traffic.control.default-tenant-qps-rate|10|default tenant max request number per second|
|api.traffic.control.customize-tenant-qps-rate||customize tenant max request number per second|
### Master Server related configuration

10
docs/docs/zh/architecture/configuration.md

@ -257,11 +257,11 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
|security.authentication.ldap.ssl.enable|false|LDAP ssl开关|
|security.authentication.ldap.ssl.trust-store|ldapkeystore.jks|LDAP jks文件绝对路径|
|security.authentication.ldap.ssl.trust-store-password|password|LDAP jks密码|
|traffic.control.global.switch|false|流量控制全局开关|
|traffic.control.max-global-qps-rate|300|全局最大请求数/秒|
|traffic.control.tenant-switch|false|流量控制租户开关|
|traffic.control.default-tenant-qps-rate|10|默认租户最大请求数/秒限制|
|traffic.control.customize-tenant-qps-rate||自定义租户最大请求数/秒限制|
|api.traffic.control.global.switch|false|流量控制全局开关|
|api.traffic.control.max-global-qps-rate|300|全局最大请求数/秒|
|api.traffic.control.tenant-switch|false|流量控制租户开关|
|api.traffic.control.default-tenant-qps-rate|10|默认租户最大请求数/秒限制|
|api.traffic.control.customize-tenant-qps-rate||自定义租户最大请求数/秒限制|
## Master Server相关配置

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/AuditPublishService.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.api.audit;
import org.apache.dolphinscheduler.api.configuration.AuditConfiguration;
import org.apache.dolphinscheduler.api.configuration.ApiConfig;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@ -34,20 +34,20 @@ import org.springframework.stereotype.Component;
@Slf4j
public class AuditPublishService {
private BlockingQueue<AuditMessage> auditMessageQueue = new LinkedBlockingQueue<>();
private final BlockingQueue<AuditMessage> auditMessageQueue = new LinkedBlockingQueue<>();
@Autowired
private List<AuditSubscriber> subscribers;
@Autowired
private AuditConfiguration auditConfiguration;
private ApiConfig apiConfig;
/**
* create a daemon thread to process the message queue
*/
@PostConstruct
private void init() {
if (auditConfiguration.getEnabled()) {
if (apiConfig.isAuditEnable()) {
Thread thread = new Thread(this::doPublish);
thread.setDaemon(true);
thread.setName("Audit-Log-Consume-Thread");
@ -61,7 +61,7 @@ public class AuditPublishService {
* @param message audit message
*/
public void publish(AuditMessage message) {
if (auditConfiguration.getEnabled() && !auditMessageQueue.offer(message)) {
if (apiConfig.isAuditEnable() && !auditMessageQueue.offer(message)) {
log.error("Publish audit message failed, message:{}", message);
}
}

90
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/ApiConfig.java

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.configuration;
import java.util.HashMap;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.Errors;
import org.springframework.validation.Validator;
import org.springframework.validation.annotation.Validated;
@Slf4j
@Data
@Validated
@Configuration
@ConfigurationProperties(value = "api")
public class ApiConfig implements Validator {
private boolean auditEnable = false;
private TrafficConfiguration trafficControl = new TrafficConfiguration();
private PythonGatewayConfiguration pythonGateway = new PythonGatewayConfiguration();
@Override
public boolean supports(Class<?> clazz) {
return ApiConfig.class.isAssignableFrom(clazz);
}
@Override
public void validate(Object target, Errors errors) {
printConfig();
}
private void printConfig() {
log.info("API config: auditEnable -> {} ", auditEnable);
log.info("API config: trafficControl -> {} ", trafficControl);
log.info("API config: pythonGateway -> {} ", pythonGateway);
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class TrafficConfiguration {
private boolean globalSwitch = false;
private Integer maxGlobalQpsRate = 300;
private boolean tenantSwitch = false;
private Integer defaultTenantQpsRate = 10;
private Map<String, Integer> customizeTenantQpsRate = new HashMap<>();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class PythonGatewayConfiguration {
private boolean enabled = true;
private String gatewayServerAddress = "0.0.0.0";
private int gatewayServerPort = 25333;
private String pythonAddress = "127.0.0.1";
private int pythonPort = 25334;
private int connectTimeout = 0;
private int readTimeout = 0;
private String authToken = "jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc";
}
}

7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AppConfiguration.java

@ -50,7 +50,7 @@ public class AppConfiguration implements WebMvcConfigurer {
public static final String LOCALE_LANGUAGE_COOKIE = "language";
@Autowired
private TrafficConfiguration trafficConfiguration;
private ApiConfig apiConfig;
@Bean
public CorsFilter corsFilter() {
@ -90,14 +90,15 @@ public class AppConfiguration implements WebMvcConfigurer {
@Bean
public RateLimitInterceptor createRateLimitInterceptor() {
return new RateLimitInterceptor(trafficConfiguration);
return new RateLimitInterceptor(apiConfig.getTrafficControl());
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
// i18n
registry.addInterceptor(localeChangeInterceptor());
if (trafficConfiguration.isGlobalSwitch() || trafficConfiguration.isTenantSwitch()) {
ApiConfig.TrafficConfiguration trafficControl = apiConfig.getTrafficControl();
if (trafficControl.isGlobalSwitch() || trafficControl.isTenantSwitch()) {
registry.addInterceptor(createRateLimitInterceptor());
}
registry.addInterceptor(loginInterceptor())

38
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AuditConfiguration.java

@ -1,38 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.configuration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@EnableConfigurationProperties
@ConfigurationProperties(value = "audit", ignoreUnknownFields = false)
public class AuditConfiguration {
private boolean enabled;
public boolean getEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
}

38
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/PythonGatewayConfiguration.java

@ -1,38 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.configuration;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
@ConfigurationProperties(value = "python-gateway")
public class PythonGatewayConfiguration {
private boolean enabled;
private String gatewayServerAddress;
private int gatewayServerPort;
private String pythonAddress;
private int pythonPort;
private int connectTimeout;
private int readTimeout;
private String authToken;
}

38
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/TrafficConfiguration.java

@ -1,38 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.configuration;
import java.util.HashMap;
import java.util.Map;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
@ConfigurationProperties(prefix = "traffic.control")
public class TrafficConfiguration {
private boolean globalSwitch;
private Integer maxGlobalQpsRate = 300;
private boolean tenantSwitch;
private Integer defaultTenantQpsRate = 10;
private Map<String, Integer> customizeTenantQpsRate = new HashMap<>();
}

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptor.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.api.interceptor;
import org.apache.dolphinscheduler.api.configuration.TrafficConfiguration;
import org.apache.dolphinscheduler.api.configuration.ApiConfig;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
@ -47,7 +47,7 @@ import com.google.common.util.concurrent.RateLimiter;
@Slf4j
public class RateLimitInterceptor implements HandlerInterceptor {
private TrafficConfiguration trafficConfiguration;
private ApiConfig.TrafficConfiguration trafficConfiguration;
private RateLimiter globalRateLimiter;
@ -98,7 +98,7 @@ public class RateLimitInterceptor implements HandlerInterceptor {
return true;
}
public RateLimitInterceptor(TrafficConfiguration trafficConfiguration) {
public RateLimitInterceptor(ApiConfig.TrafficConfiguration trafficConfiguration) {
this.trafficConfiguration = trafficConfiguration;
if (trafficConfiguration.isGlobalSwitch()) {
this.globalRateLimiter =

7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.api.python;
import org.apache.dolphinscheduler.api.configuration.PythonGatewayConfiguration;
import org.apache.dolphinscheduler.api.configuration.ApiConfig;
import org.apache.dolphinscheduler.api.dto.EnvironmentDto;
import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
import org.apache.dolphinscheduler.api.enums.Status;
@ -145,7 +145,7 @@ public class PythonGateway {
private DataSourceMapper dataSourceMapper;
@Autowired
private PythonGatewayConfiguration pythonGatewayConfiguration;
private ApiConfig apiConfig;
@Autowired
private ProjectUserMapper projectUserMapper;
@ -689,13 +689,14 @@ public class PythonGateway {
@PostConstruct
public void init() {
if (pythonGatewayConfiguration.isEnabled()) {
if (apiConfig.getPythonGateway().isEnabled()) {
this.start();
}
}
private void start() {
try {
ApiConfig.PythonGatewayConfiguration pythonGatewayConfiguration = apiConfig.getPythonGateway();
InetAddress gatewayHost = InetAddress.getByName(pythonGatewayConfiguration.getGatewayServerAddress());
GatewayServerBuilder serverBuilder = new GatewayServer.GatewayServerBuilder()
.entryPoint(this)

76
dolphinscheduler-api/src/main/resources/application.yaml

@ -115,35 +115,46 @@ registry:
block-until-connected: 600ms
digest: ~
audit:
enabled: false
api:
audit-enable: false
# Traffic control, if you turn on this config, the maximum number of request/s will be limited.
# global max request number per second
# default tenant-level max request number
traffic-control:
global-switch: false
max-global-qps-rate: 300
tenant-switch: false
default-tenant-qps-rate: 10
#customize-tenant-qps-rate:
# eg.
#tenant1: 11
#tenant2: 20
python-gateway:
# Weather enable python gateway server or not. The default value is true.
enabled: true
# Authentication token for connection from python api to python gateway server. Should be changed the default value
# when you deploy in public network.
auth-token: jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc
# The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
# between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
gateway-server-address: 0.0.0.0
# The port of Python gateway server start. Define which port you could connect to Python gateway server from
# Python API side.
gateway-server-port: 25333
# The address of Python callback client.
python-address: 127.0.0.1
# The port of Python callback client.
python-port: 25334
# Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite),
# and socket server would never close even though no requests accept
connect-timeout: 0
# Close each active connection of socket server if python program not active after x milliseconds. Define value is
# (0 = infinite), and socket server would never close even though no requests accept
read-timeout: 0
metrics:
enabled: true
python-gateway:
# Weather enable python gateway server or not. The default value is true.
enabled: true
# Authentication token for connection from python api to python gateway server. Should be changed the default value
# when you deploy in public network.
auth-token: jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc
# The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
# between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
gateway-server-address: 0.0.0.0
# The port of Python gateway server start. Define which port you could connect to Python gateway server from
# Python API side.
gateway-server-port: 25333
# The address of Python callback client.
python-address: 127.0.0.1
# The port of Python callback client.
python-port: 25334
# Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite),
# and socket server would never close even though no requests accept
connect-timeout: 0
# Close each active connection of socket server if python program not active after x milliseconds. Define value is
# (0 = infinite), and socket server would never close even though no requests accept
read-timeout: 0
security:
authentication:
# Authentication types (supported types: PASSWORD,LDAP,CASDOOR_SSO)
@ -168,21 +179,6 @@ security:
trust-store: "/ldapkeystore.jks"
trust-store-password: "password"
# Traffic control, if you turn on this config, the maximum number of request/s will be limited.
# global max request number per second
# default tenant-level max request number
traffic:
control:
global-switch: false
max-global-qps-rate: 300
tenant-switch: false
default-tenant-qps-rate: 10
#customize-tenant-qps-rate:
# eg.
#tenant1: 11
#tenant2: 20
# Override by profile
---

32
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/TrafficConfigurationTest.java → dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/ApiConfigTest.java

@ -25,33 +25,33 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
public class TrafficConfigurationTest extends AbstractControllerTest {
public class ApiConfigTest extends AbstractControllerTest {
@Autowired
private TrafficConfiguration trafficConfiguration;
private ApiConfig apiConfig;
@Test
public void isTrafficGlobalControlSwitch() {
Assertions.assertFalse(trafficConfiguration.isGlobalSwitch());
public void testIsAuditEnable() {
Assertions.assertTrue(apiConfig.isAuditEnable());
}
@Test
public void getMaxGlobalQpsLimit() {
Assertions.assertEquals(300, (int) trafficConfiguration.getMaxGlobalQpsRate());
}
public void testGetTrafficControlConfig() {
ApiConfig.TrafficConfiguration trafficControl = apiConfig.getTrafficControl();
@Test
public void isTrafficTenantControlSwitch() {
Assertions.assertFalse(trafficConfiguration.isTenantSwitch());
}
Assertions.assertFalse(trafficControl.isGlobalSwitch());
Assertions.assertEquals(299, (int) trafficControl.getMaxGlobalQpsRate());
Assertions.assertFalse(trafficControl.isTenantSwitch());
Assertions.assertEquals(9, (int) trafficControl.getDefaultTenantQpsRate());
Assertions.assertTrue(MapUtils.isEmpty(trafficControl.getCustomizeTenantQpsRate()));
@Test
public void getDefaultTenantQpsLimit() {
Assertions.assertEquals(10, (int) trafficConfiguration.getDefaultTenantQpsRate());
}
@Test
public void getCustomizeTenantQpsRate() {
Assertions.assertTrue(MapUtils.isEmpty(trafficConfiguration.getCustomizeTenantQpsRate()));
public void testGetPythonGateway() {
ApiConfig.PythonGatewayConfiguration pythonGateway = apiConfig.getPythonGateway();
Assertions.assertFalse(pythonGateway.isEnabled());
}
}

37
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/AuditConfigurationTest.java

@ -1,37 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.configuration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@ActiveProfiles("audit")
@SpringBootTest(classes = AuditConfiguration.class)
public class AuditConfigurationTest {
@Autowired
private AuditConfiguration auditConfiguration;
@Test
public void isAuditGlobalControlSwitch() {
Assertions.assertTrue(auditConfiguration.getEnabled());
}
}

8
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptorTest.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.api.interceptor;
import org.apache.dolphinscheduler.api.configuration.TrafficConfiguration;
import org.apache.dolphinscheduler.api.configuration.ApiConfig;
import java.util.HashMap;
import java.util.Map;
@ -39,14 +39,14 @@ public class RateLimitInterceptorTest {
public void testPreHandleWithoutControl() throws ExecutionException {
HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
RateLimitInterceptor rateLimitInterceptor = new RateLimitInterceptor(new TrafficConfiguration());
RateLimitInterceptor rateLimitInterceptor = new RateLimitInterceptor(new ApiConfig.TrafficConfiguration());
Assertions.assertTrue(rateLimitInterceptor.preHandle(request, response, null));
Assertions.assertTrue(rateLimitInterceptor.preHandle(request, response, null));
}
@Test
public void testPreHandleWithTenantLevenControl() throws ExecutionException {
TrafficConfiguration trafficConfiguration = new TrafficConfiguration();
ApiConfig.TrafficConfiguration trafficConfiguration = new ApiConfig.TrafficConfiguration();
trafficConfiguration.setTenantSwitch(true);
Map<String, Integer> map = new HashMap<>();
map.put("tenant1", 2);
@ -70,7 +70,7 @@ public class RateLimitInterceptorTest {
@Test
public void testPreHandleWithGlobalControl() throws ExecutionException {
TrafficConfiguration trafficConfiguration = new TrafficConfiguration();
ApiConfig.TrafficConfiguration trafficConfiguration = new ApiConfig.TrafficConfiguration();
trafficConfiguration.setTenantSwitch(true);
trafficConfiguration.setGlobalSwitch(true);
trafficConfiguration.setMaxGlobalQpsRate(3);

38
dolphinscheduler-api/src/test/resources/application.yaml

@ -27,5 +27,39 @@ spring:
registry:
type: zookeeper
audit:
enabled: true
api:
audit-enable: true
# Traffic control, if you turn on this config, the maximum number of request/s will be limited.
# global max request number per second
# default tenant-level max request number
traffic-control:
global-switch: false
max-global-qps-rate: 299
tenant-switch: false
default-tenant-qps-rate: 9
#customize-tenant-qps-rate:
# eg.
#tenant1: 11
#tenant2: 20
python-gateway:
# Weather enable python gateway server or not. The default value is true.
enabled: false
# Authentication token for connection from python api to python gateway server. Should be changed the default value
# when you deploy in public network.
auth-token: jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc
# The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
# between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
gateway-server-address: 0.0.0.0
# The port of Python gateway server start. Define which port you could connect to Python gateway server from
# Python API side.
gateway-server-port: 25333
# The address of Python callback client.
python-address: 127.0.0.1
# The port of Python callback client.
python-port: 25334
# Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite),
# and socket server would never close even though no requests accept
connect-timeout: 0
# Close each active connection of socket server if python program not active after x milliseconds. Define value is
# (0 = infinite), and socket server would never close even though no requests accept
read-timeout: 0

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap;
@ -73,6 +74,9 @@ public class MasterServer implements IStoppable {
@Autowired
private MasterRPCServer masterRPCServer;
@Autowired
private MasterRpcClient masterRpcClient;
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
SpringApplication.run(MasterServer.class);
@ -85,6 +89,7 @@ public class MasterServer implements IStoppable {
public void run() throws SchedulerException {
// init rpc server
this.masterRPCServer.start();
this.masterRpcClient.start();
// install task plugin
this.taskPluginManager.loadPlugin();
@ -125,6 +130,7 @@ public class MasterServer implements IStoppable {
SchedulerApi closedSchedulerApi = schedulerApi;
MasterSchedulerBootstrap closedSchedulerBootstrap = masterSchedulerBootstrap;
MasterRPCServer closedRpcServer = masterRPCServer;
MasterRpcClient closedRpcClient = masterRpcClient;
MasterRegistryClient closedMasterRegistryClient = masterRegistryClient;
// close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
// like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -20,6 +20,8 @@ package org.apache.dolphinscheduler.server.master.config;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
@ -97,6 +99,10 @@ public class MasterConfig implements Validator {
private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L);
private NettyClientConfig masterRpcClientConfig = new NettyClientConfig();
private NettyServerConfig masterRpcServerConfig = new NettyServerConfig();
// ip:listenPort
private String masterAddress;
@ -177,5 +183,7 @@ public class MasterConfig implements Validator {
log.info("Master config: masterAddress -> {} ", masterAddress);
log.info("Master config: masterRegistryPath -> {} ", masterRegistryPath);
log.info("Master config: workerGroupRefreshInterval -> {} ", workerGroupRefreshInterval);
log.info("Master config: masterRpcServerConfig -> {} ", masterRpcServerConfig);
log.info("Master config: masterRpcClientConfig -> {} ", masterRpcClientConfig);
}
}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java

@ -47,7 +47,7 @@ public class MasterRPCServer implements AutoCloseable {
public void start() {
log.info("Starting Master RPC Server...");
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
NettyServerConfig serverConfig = masterConfig.getMasterRpcServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
for (MasterRpcProcessor masterRpcProcessor : masterRpcProcessors) {

25
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcClient.java

@ -19,25 +19,28 @@ package org.apache.dolphinscheduler.server.master.rpc;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Message;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MasterRpcClient {
public class MasterRpcClient implements AutoCloseable {
private final NettyRemotingClient client;
@Autowired
private MasterConfig masterConfig;
private static final long DEFAULT_TIME_OUT_MILLS = 10_000L;
private NettyRemotingClient client;
public MasterRpcClient() {
client = new NettyRemotingClient(new NettyClientConfig());
public void start() {
client = new NettyRemotingClient(masterConfig.getMasterRpcClientConfig());
log.info("Success initialized MasterRPCClient...");
}
@ -46,7 +49,15 @@ public class MasterRpcClient {
return client.sendSync(host, rpcMessage, DEFAULT_TIME_OUT_MILLS);
}
public void send(Host of, Message message) throws RemotingException {
client.send(of, message);
public void send(@NonNull Host host, @NonNull Message message) throws RemotingException {
client.send(host, message);
}
@Override
public void close() {
if (client != null) {
client.close();
}
}
}

33
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -47,7 +47,6 @@ import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@ -244,19 +243,10 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
private final CuringParamsService curingParamsService;
private final String masterAddress;
private final DefaultTaskExecuteRunnableFactory defaultTaskExecuteRunnableFactory;
/**
* @param processInstance processInstance
* @param processService processService
* @param processInstanceDao processInstanceDao
* @param masterRpcClient masterRpcClient
* @param processAlertManager processAlertManager
* @param masterConfig masterConfig
* @param stateWheelExecuteThread stateWheelExecuteThread
*/
private final MasterConfig masterConfig;
public WorkflowExecuteRunnable(
@NonNull ProcessInstance processInstance,
@NonNull CommandService commandService,
@ -275,12 +265,12 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
this.processInstanceDao = processInstanceDao;
this.processInstance = processInstance;
this.masterRpcClient = masterRpcClient;
this.masterConfig = masterConfig;
this.processAlertManager = processAlertManager;
this.stateWheelExecuteThread = stateWheelExecuteThread;
this.curingParamsService = curingParamsService;
this.taskInstanceDao = taskInstanceDao;
this.taskDefinitionLogDao = taskDefinitionLogDao;
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
this.defaultTaskExecuteRunnableFactory = defaultTaskExecuteRunnableFactory;
this.processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
@ -480,7 +470,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
taskInstance.getState());
this.updateProcessInstanceState();
sendTaskLogOnMasterToRemoteIfNeeded(taskInstance.getLogPath(), taskInstance.getHost());
sendTaskLogOnMasterToRemoteIfNeeded(taskInstance);
} catch (Exception ex) {
log.error("Task finish failed, get a exception, will remove this taskInstance from completeTaskSet", ex);
// remove the task from complete map, so that we can finish in the next time.
@ -1424,7 +1414,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
try {
Message message =
masterRpcClient.sendSyncCommand(Host.of(taskInstance.getHost()),
new WorkflowHostChangeRequest(taskInstance.getId(), masterAddress).convert2Command());
new WorkflowHostChangeRequest(taskInstance.getId(), masterConfig.getMasterAddress())
.convert2Command());
if (message == null) {
log.error(
"Takeover task instance failed, the worker {} might not be alive, will try to create a new task instance",
@ -2237,17 +2228,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
}
private void sendTaskLogOnMasterToRemoteIfNeeded(String logPath, String host) {
if (RemoteLogUtils.isRemoteLoggingEnable() && isExecutedOnMaster(host)) {
RemoteLogUtils.sendRemoteLog(logPath);
log.info("Master sends task log {} to remote storage asynchronously.", logPath);
private void sendTaskLogOnMasterToRemoteIfNeeded(TaskInstance taskInstance) {
if (RemoteLogUtils.isRemoteLoggingEnable() && TaskUtils.isMasterTask(taskInstance.getTaskType())) {
RemoteLogUtils.sendRemoteLog(taskInstance.getLogPath());
log.info("Master sends task log {} to remote storage asynchronously.", taskInstance.getLogPath());
}
}
private boolean isExecutedOnMaster(String host) {
return host.endsWith(masterAddress.split(Constants.COLON)[1]);
}
private void mergeTaskInstanceVarPool(TaskInstance taskInstance) {
String taskVarPoolJson = taskInstance.getVarPool();
if (StringUtils.isEmpty(taskVarPoolJson)) {

61
dolphinscheduler-standalone-server/src/main/resources/application.yaml

@ -181,28 +181,42 @@ alert:
wait-timeout: 0
heartbeat-interval: 60s
python-gateway:
# Weather enable python gateway server or not. The default value is true.
enabled: true
# Authentication token for connection from python api to python gateway server. Should be changed the default value
# when you deploy in public network.
auth-token: jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc
# The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
# between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
gateway-server-address: 0.0.0.0
# The port of Python gateway server start. Define which port you could connect to Python gateway server from
# Python API side.
gateway-server-port: 25333
# The address of Python callback client.
python-address: 127.0.0.1
# The port of Python callback client.
python-port: 25334
# Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite),
# and socket server would never close even though no requests accept
connect-timeout: 0
# Close each active connection of socket server if python program not active after x milliseconds. Define value is
# (0 = infinite), and socket server would never close even though no requests accept
read-timeout: 0
api:
audit-enable: false
# Traffic control, if you turn on this config, the maximum number of request/s will be limited.
# global max request number per second
# default tenant-level max request number
traffic-control:
global-switch: false
max-global-qps-rate: 300
tenant-switch: false
default-tenant-qps-rate: 10
#customize-tenant-qps-rate:
# eg.
#tenant1: 11
#tenant2: 20
python-gateway:
# Weather enable python gateway server or not. The default value is true.
enabled: true
# Authentication token for connection from python api to python gateway server. Should be changed the default value
# when you deploy in public network.
auth-token: jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc
# The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
# between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
gateway-server-address: 0.0.0.0
# The port of Python gateway server start. Define which port you could connect to Python gateway server from
# Python API side.
gateway-server-port: 25333
# The address of Python callback client.
python-address: 127.0.0.1
# The port of Python callback client.
python-port: 25334
# Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite),
# and socket server would never close even though no requests accept
connect-timeout: 0
# Close each active connection of socket server if python program not active after x milliseconds. Define value is
# (0 = infinite), and socket server would never close even though no requests accept
read-timeout: 0
server:
port: 12345
@ -234,9 +248,6 @@ management:
tags:
application: ${spring.application.name}
audit:
enabled: true
metrics:
enabled: true

5
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -21,6 +21,8 @@ import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DO
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import java.time.Duration;
@ -50,6 +52,9 @@ public class WorkerConfig implements Validator {
private double reservedMemory = 0.1;
private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties();
private NettyClientConfig workerRpcClientConfig = new NettyClientConfig();
private NettyServerConfig workerRpcServerConfig = new NettyServerConfig();
/**
* This field doesn't need to set at config file, it will be calculated by workerIp:listenPort
*/

9
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java

@ -19,10 +19,10 @@ package org.apache.dolphinscheduler.server.worker.rpc;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Message;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.processor.WorkerRpcProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import java.util.List;
@ -43,16 +43,17 @@ public class WorkerRpcClient implements AutoCloseable {
@Lazy
private List<WorkerRpcProcessor> workerRpcProcessors;
@Autowired
private WorkerConfig workerConfig;
private NettyRemotingClient nettyRemotingClient;
public void start() {
log.info("Worker rpc client starting");
NettyClientConfig nettyClientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(nettyClientConfig);
this.nettyRemotingClient = new NettyRemotingClient(workerConfig.getWorkerRpcClientConfig());
// we only use the client to handle the ack message, we can optimize this, send ack to the nettyServer.
for (WorkerRpcProcessor workerRpcProcessor : workerRpcProcessors) {
this.nettyRemotingClient.registerProcessor(workerRpcProcessor);
}
log.info("Worker rpc client started");
}

2
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java

@ -44,7 +44,7 @@ public class WorkerRpcServer implements Closeable {
public void start() {
log.info("Worker rpc server starting...");
NettyServerConfig serverConfig = new NettyServerConfig();
NettyServerConfig serverConfig = workerConfig.getWorkerRpcServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
nettyRemotingServer = new NettyRemotingServer(serverConfig);
for (WorkerRpcProcessor workerRpcProcessor : workerRpcProcessors) {

Loading…
Cancel
Save