Browse Source

Merge remote-tracking branch 'origin/dev' into json_split_two

# Conflicts:
#	.github/actions/translate-on-issue
#	docker/build/conf/dolphinscheduler/registry.properties.tpl
#	docker/build/startup-init-conf.sh
#	docker/docker-swarm/config.env.sh
#	docker/kubernetes/dolphinscheduler/templates/_helpers.tpl
#	dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
#	dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java
#	dolphinscheduler-service/src/main/resources/registry.properties
2.0.7-release
CalvinKirs 3 years ago
parent
commit
fc203fb41c
  1. 8
      docker/build/conf/dolphinscheduler/registry.properties.tpl
  2. 2
      docker/build/startup-init-conf.sh
  3. 5
      docker/docker-swarm/config.env.sh
  4. 24
      docker/kubernetes/dolphinscheduler/templates/_helpers.tpl
  5. 23
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/Result.java
  6. 74
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
  7. 5
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java
  8. 16
      dolphinscheduler-service/src/main/resources/registry.properties

8
docker/build/conf/dolphinscheduler/registry.properties.tpl

@ -15,9 +15,13 @@
# limitations under the License. # limitations under the License.
# #
registry.plugin.name=${REGISTRY_PLUGIN_NAME} #registry.plugin.dir config the Registry Plugin dir.
registry.plugin.dir=${REGISTRY_PLUGIN_DIR} registry.plugin.dir=${REGISTRY_PLUGIN_DIR}
registry.plugin.binding=registry
registry.plugin.name=${REGISTRY_PLUGIN_NAME}
registry.servers=${REGISTRY_SERVERS} registry.servers=${REGISTRY_SERVERS}
#maven.local.repository=/usr/local/localRepository
#registry.plugin.binding config the Registry Plugin need be load when development and run in IDE
#registry.plugin.binding=./dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml

2
docker/build/startup-init-conf.sh

@ -37,7 +37,7 @@ export DATABASE_PARAMS=${DATABASE_PARAMS:-"characterEncoding=utf8"}
#============================================================================ #============================================================================
# Registry # Registry
#============================================================================ #============================================================================
export REGISTRY_PLUGIN_DIR=${REGISTRY_PLUGIN_DIR:-"lib/plugin/registry/zookeeper"} export REGISTRY_PLUGIN_DIR=${REGISTRY_PLUGIN_DIR:-"lib/plugin/registry"}
export REGISTRY_PLUGIN_NAME=${REGISTRY_PLUGIN_NAME:-"zookeeper"} export REGISTRY_PLUGIN_NAME=${REGISTRY_PLUGIN_NAME:-"zookeeper"}
export REGISTRY_SERVERS=${REGISTRY_SERVERS:-"127.0.0.1:2181"} export REGISTRY_SERVERS=${REGISTRY_SERVERS:-"127.0.0.1:2181"}

5
docker/docker-swarm/config.env.sh

@ -37,10 +37,9 @@ DATABASE_PARAMS=characterEncoding=utf8
# DATABASE_PARAMS=useUnicode=true&characterEncoding=UTF-8 # DATABASE_PARAMS=useUnicode=true&characterEncoding=UTF-8
#============================================================================ #============================================================================
# ZooKeeper # Registry
#============================================================================ #============================================================================
REGISTRY_PLUGIN_DIR=lib/plugin/registry
REGISTRY_PLUGIN_DIR=lib/plugin/registry/zookeeper
REGISTRY_PLUGIN_NAME=zookeeper REGISTRY_PLUGIN_NAME=zookeeper
REGISTRY_SERVERS=dolphinscheduler-zookeeper:2181 REGISTRY_SERVERS=dolphinscheduler-zookeeper:2181

24
docker/kubernetes/dolphinscheduler/templates/_helpers.tpl

@ -162,21 +162,27 @@ Create a database environment variables.
{{- end }} {{- end }}
{{- end -}} {{- end -}}
{{/* todo {{/*
Create a rregistry environment variables. Create a registry environment variables.
*/}} */}}
{{- define "dolphinscheduler.zookeeper.env_vars" -}} {{- define "dolphinscheduler.registry.env_vars" -}}
- name: ZOOKEEPER_QUORUM - name: REGISTRY_PLUGIN_DIR
{{- if .Values.zookeeper.enabled }} {{- if .Values.zookeeper.enabled }}
value: {{ template "dolphinscheduler.zookeeper.quorum" . }} value: "lib/plugin/registry"
{{- else }}
value: {{ .Values.externalRegistry.registryPluginDir }}
{{- end }}
- name: REGISTRY_PLUGIN_NAME
{{- if .Values.zookeeper.enabled }}
value: "zookeeper"
{{- else }} {{- else }}
value: {{ .Values.externalZookeeper.zookeeperQuorum }} value: {{ .Values.externalRegistry.registryPluginName }}
{{- end }} {{- end }}
- name: ZOOKEEPER_ROOT - name: REGISTRY_SERVERS
{{- if .Values.zookeeper.enabled }} {{- if .Values.zookeeper.enabled }}
value: {{ .Values.zookeeper.zookeeperRoot }} value: {{ template "dolphinscheduler.zookeeper.quorum" . }}
{{- else }} {{- else }}
value: {{ .Values.externalZookeeper.zookeeperRoot }} value: {{ .Values.externalRegistry.registryServers }}
{{- end }} {{- end }}
{{- end -}} {{- end -}}

23
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/Result.java

@ -72,6 +72,18 @@ public class Result<T> {
return new Result<>(data); return new Result<>(data);
} }
public boolean isSuccess() {
return this.isStatus(Status.SUCCESS);
}
public boolean isFailed() {
return !this.isSuccess();
}
public boolean isStatus(Status status) {
return this.code != null && this.code.equals(status.getCode());
}
/** /**
* Call this function if there is any error * Call this function if there is any error
* *
@ -120,10 +132,11 @@ public class Result<T> {
@Override @Override
public String toString() { public String toString() {
return "Status{" + return "Status{"
"code='" + code + '\'' + + "code='" + code
", msg='" + msg + '\'' + + '\'' + ", msg='"
", data=" + data + + msg + '\''
'}'; + ", data=" + data
+ '}';
} }
} }

74
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java

@ -73,28 +73,27 @@ public class ProcessDefinitionControllerTest {
@Test @Test
public void testCreateProcessDefinition() throws Exception { public void testCreateProcessDefinition() throws Exception {
String json = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1," String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\""
+ "\"condition_type\":0,\"condition_params\":{}},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1," + ":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\"
+ "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":{}}]"; + "necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\""
+ ",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},"
+ "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}";
String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}";
String projectName = "test"; String projectName = "test";
String name = "dag_test"; String name = "dag_test";
String description = "desc test"; String description = "desc test";
String globalParams = "[]";
String connects = "[]"; String connects = "[]";
String locations = "[]";
int timeout = 0;
String tenantCode = "root";
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, 1); result.put(Constants.DATA_LIST, 1);
Mockito.when(processDefinitionService.createProcessDefinition(user, projectName, name, description, globalParams, Mockito.when(processDefinitionService.createProcessDefinition(user, projectName, name, json,
connects, locations, timeout, tenantCode, json)).thenReturn(result); description, locations, connects)).thenReturn(result);
Result response = processDefinitionController.createProcessDefinition(user, projectName, name, description, globalParams, Result response = processDefinitionController.createProcessDefinition(user, projectName, name, json,
connects, locations, timeout, tenantCode, json); locations, connects, description);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); Assert.assertTrue(response.isSuccess());
} }
private void putMsg(Map<String, Object> result, Status status, Object... statusParams) { private void putMsg(Map<String, Object> result, Status status, Object... statusParams) {
@ -117,35 +116,34 @@ public class ProcessDefinitionControllerTest {
Mockito.when(processDefinitionService.verifyProcessDefinitionName(user, projectName, name)).thenReturn(result); Mockito.when(processDefinitionService.verifyProcessDefinitionName(user, projectName, name)).thenReturn(result);
Result response = processDefinitionController.verifyProcessDefinitionName(user, projectName, name); Result response = processDefinitionController.verifyProcessDefinitionName(user, projectName, name);
Assert.assertEquals(Status.PROCESS_DEFINITION_NAME_EXIST.getCode(), response.getCode().intValue()); Assert.assertTrue(response.isStatus(Status.PROCESS_DEFINITION_NAME_EXIST));
} }
@Test @Test
public void updateProcessDefinition() { public void updateProcessDefinition() throws Exception {
String json = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
+ "\"condition_type\":0,\"condition_params\":{}},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1," String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\":\"ssh_test1\""
+ "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":{}}]"; + ",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\necho ${aa}\"}"
+ ",\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\""
+ ":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\""
+ ":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}";
String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"; String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}";
String projectName = "test"; String projectName = "test";
String name = "dag_test"; String name = "dag_test";
String description = "desc test"; String description = "desc test";
String connects = "[]"; String connects = "[]";
String globalParams = "[]"; int id = 1;
int timeout = 0;
String tenantCode = "root";
long code = 123L;
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
result.put("processDefinitionId", 1); result.put("processDefinitionId", 1);
Mockito.when(processDefinitionService.updateProcessDefinition(user, projectName, name, code, description, globalParams, Mockito.when(processDefinitionService.updateProcessDefinition(user, projectName, id, name, json,
connects, locations, timeout, tenantCode, json)).thenReturn(result); description, locations, connects)).thenReturn(result);
Result response = processDefinitionController.updateProcessDefinition(user, projectName, name, code, description, globalParams, Result response = processDefinitionController.updateProcessDefinition(user, projectName, name, id, json,
connects, locations, timeout, tenantCode, json, ReleaseState.OFFLINE); locations, connects, description,ReleaseState.OFFLINE);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); Assert.assertTrue(response != null && response.isSuccess()); }
}
@Test @Test
public void testReleaseProcessDefinition() throws Exception { public void testReleaseProcessDefinition() throws Exception {
@ -156,7 +154,7 @@ public class ProcessDefinitionControllerTest {
Mockito.when(processDefinitionService.releaseProcessDefinition(user, projectName, id, ReleaseState.OFFLINE)).thenReturn(result); Mockito.when(processDefinitionService.releaseProcessDefinition(user, projectName, id, ReleaseState.OFFLINE)).thenReturn(result);
Result response = processDefinitionController.releaseProcessDefinition(user, projectName, id, ReleaseState.OFFLINE); Result response = processDefinitionController.releaseProcessDefinition(user, projectName, id, ReleaseState.OFFLINE);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); Assert.assertTrue(response != null && response.isSuccess());
} }
@Test @Test
@ -189,7 +187,7 @@ public class ProcessDefinitionControllerTest {
Mockito.when(processDefinitionService.queryProcessDefinitionById(user, projectName, id)).thenReturn(result); Mockito.when(processDefinitionService.queryProcessDefinitionById(user, projectName, id)).thenReturn(result);
Result response = processDefinitionController.queryProcessDefinitionById(user, projectName, id); Result response = processDefinitionController.queryProcessDefinitionById(user, projectName, id);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); Assert.assertTrue(response != null && response.isSuccess());
} }
@Test @Test
@ -205,7 +203,7 @@ public class ProcessDefinitionControllerTest {
Mockito.when(processDefinitionService.batchCopyProcessDefinition(user, projectName, id, targetProjectId)).thenReturn(result); Mockito.when(processDefinitionService.batchCopyProcessDefinition(user, projectName, id, targetProjectId)).thenReturn(result);
Result response = processDefinitionController.copyProcessDefinition(user, projectName, id, targetProjectId); Result response = processDefinitionController.copyProcessDefinition(user, projectName, id, targetProjectId);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); Assert.assertTrue(response != null && response.isSuccess());
} }
@Test @Test
@ -221,7 +219,7 @@ public class ProcessDefinitionControllerTest {
Mockito.when(processDefinitionService.batchMoveProcessDefinition(user, projectName, id, targetProjectId)).thenReturn(result); Mockito.when(processDefinitionService.batchMoveProcessDefinition(user, projectName, id, targetProjectId)).thenReturn(result);
Result response = processDefinitionController.moveProcessDefinition(user, projectName, id, targetProjectId); Result response = processDefinitionController.moveProcessDefinition(user, projectName, id, targetProjectId);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); Assert.assertTrue(response != null && response.isSuccess());
} }
@Test @Test
@ -237,7 +235,7 @@ public class ProcessDefinitionControllerTest {
Mockito.when(processDefinitionService.queryProcessDefinitionList(user, projectName)).thenReturn(result); Mockito.when(processDefinitionService.queryProcessDefinitionList(user, projectName)).thenReturn(result);
Result response = processDefinitionController.queryProcessDefinitionList(user, projectName); Result response = processDefinitionController.queryProcessDefinitionList(user, projectName);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); Assert.assertTrue(response != null && response.isSuccess());
} }
public List<ProcessDefinition> getDefinitionList() { public List<ProcessDefinition> getDefinitionList() {
@ -292,7 +290,7 @@ public class ProcessDefinitionControllerTest {
Mockito.when(processDefinitionService.deleteProcessDefinitionById(user, projectName, id)).thenReturn(result); Mockito.when(processDefinitionService.deleteProcessDefinitionById(user, projectName, id)).thenReturn(result);
Result response = processDefinitionController.deleteProcessDefinitionById(user, projectName, id); Result response = processDefinitionController.deleteProcessDefinitionById(user, projectName, id);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); Assert.assertTrue(response != null && response.isSuccess());
} }
@Test @Test
@ -306,7 +304,7 @@ public class ProcessDefinitionControllerTest {
Mockito.when(processDefinitionService.getTaskNodeListByDefinitionCode(code)).thenReturn(result); Mockito.when(processDefinitionService.getTaskNodeListByDefinitionCode(code)).thenReturn(result);
Result response = processDefinitionController.getNodeListByDefinitionCode(user, projectName, code); Result response = processDefinitionController.getNodeListByDefinitionCode(user, projectName, code);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); Assert.assertTrue(response != null && response.isSuccess());
} }
@Test @Test
@ -320,7 +318,7 @@ public class ProcessDefinitionControllerTest {
Mockito.when(processDefinitionService.getTaskNodeListByDefinitionCodeList(codeList)).thenReturn(result); Mockito.when(processDefinitionService.getTaskNodeListByDefinitionCodeList(codeList)).thenReturn(result);
Result response = processDefinitionController.getNodeListByDefinitionCodeList(user, projectName, codeList); Result response = processDefinitionController.getNodeListByDefinitionCodeList(user, projectName, codeList);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); Assert.assertTrue(response != null && response.isSuccess());
} }
@Test @Test
@ -332,7 +330,7 @@ public class ProcessDefinitionControllerTest {
Mockito.when(processDefinitionService.queryProcessDefinitionAllByProjectId(projectId)).thenReturn(result); Mockito.when(processDefinitionService.queryProcessDefinitionAllByProjectId(projectId)).thenReturn(result);
Result response = processDefinitionController.queryProcessDefinitionAllByProjectId(user, projectId); Result response = processDefinitionController.queryProcessDefinitionAllByProjectId(user, projectId);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); Assert.assertTrue(response != null && response.isSuccess());
} }
@Test @Test
@ -346,7 +344,7 @@ public class ProcessDefinitionControllerTest {
Mockito.when(processDefinitionService.viewTree(processId, limit)).thenReturn(result); Mockito.when(processDefinitionService.viewTree(processId, limit)).thenReturn(result);
Result response = processDefinitionController.viewTree(user, projectName, processId, limit); Result response = processDefinitionController.viewTree(user, projectName, processId, limit);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); Assert.assertTrue(response != null && response.isSuccess());
} }
@Test @Test
@ -364,7 +362,7 @@ public class ProcessDefinitionControllerTest {
Mockito.when(processDefinitionService.queryProcessDefinitionListPaging(user, projectName, searchVal, pageNo, pageSize, userId)).thenReturn(result); Mockito.when(processDefinitionService.queryProcessDefinitionListPaging(user, projectName, searchVal, pageNo, pageSize, userId)).thenReturn(result);
Result response = processDefinitionController.queryProcessDefinitionListPaging(user, projectName, pageNo, searchVal, userId, pageSize); Result response = processDefinitionController.queryProcessDefinitionListPaging(user, projectName, pageNo, searchVal, userId, pageSize);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); Assert.assertTrue(response != null && response.isSuccess());
} }
@Test @Test

5
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java

@ -124,12 +124,9 @@ public class RegistryCenter {
DolphinPluginManagerConfig registryPluginManagerConfig = new DolphinPluginManagerConfig(); DolphinPluginManagerConfig registryPluginManagerConfig = new DolphinPluginManagerConfig();
registryPluginManagerConfig.setPlugins(PropertyUtils.getString(REGISTRY_PLUGIN_BINDING)); registryPluginManagerConfig.setPlugins(PropertyUtils.getString(REGISTRY_PLUGIN_BINDING));
if (StringUtils.isNotBlank(PropertyUtils.getString(REGISTRY_PLUGIN_DIR))) { if (StringUtils.isNotBlank(PropertyUtils.getString(REGISTRY_PLUGIN_DIR))) {
registryPluginManagerConfig.setPlugins(PropertyUtils.getString(REGISTRY_PLUGIN_DIR, REGISTRY_PLUGIN_PATH).trim()); registryPluginManagerConfig.setInstalledPluginsDir(PropertyUtils.getString(REGISTRY_PLUGIN_DIR, REGISTRY_PLUGIN_PATH).trim());
} }
if (StringUtils.isNotBlank(PropertyUtils.getString(MAVEN_LOCAL_REPOSITORY))) {
registryPluginManagerConfig.setMavenLocalRepository(PropertyUtils.getString(MAVEN_LOCAL_REPOSITORY).trim());
}
if (StringUtils.isNotBlank(PropertyUtils.getString(MAVEN_LOCAL_REPOSITORY))) { if (StringUtils.isNotBlank(PropertyUtils.getString(MAVEN_LOCAL_REPOSITORY))) {
registryPluginManagerConfig.setMavenLocalRepository(PropertyUtils.getString(MAVEN_LOCAL_REPOSITORY).trim()); registryPluginManagerConfig.setMavenLocalRepository(PropertyUtils.getString(MAVEN_LOCAL_REPOSITORY).trim());
} }

16
dolphinscheduler-service/src/main/resources/registry.properties

@ -15,15 +15,13 @@
# limitations under the License. # limitations under the License.
# #
#registry.plugin.dir config the Alert Plugin dir . AlertServer while find and load the Alert Plugin Jar from this dir when deploy and start AlertServer on the server . #registry.plugin.dir config the Registry Plugin dir.
#registry.plugin.dir=/Users/kris/workspace/incubator-dolphinscheduler/dolphinscheduler-dist/target/dolphinscheduler-dist-1.3.6-SNAPSHOT/lib/plugin/registry/zookeeper registry.plugin.dir=lib/plugin/registry
#registry.plugin.name=zookeeper
#registry.plugin.binding=registry
#registry.servers=127.0.0.1:2181
#maven.local.repository=/Users/gaojun/Documents/jianguoyun/localRepository
#registry.plugin.binding config the Alert Plugin need be load when development and run in IDE registry.plugin.name=zookeeper
#registry.plugin.binding=\ registry.servers=127.0.0.1:2181
# ./dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml
#maven.local.repository=/usr/local/localRepository
#registry.plugin.binding config the Registry Plugin need be load when development and run in IDE
#registry.plugin.binding=./dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml

Loading…
Cancel
Save