Wenjun Ruan
2 years ago
committed by
GitHub
128 changed files with 1479 additions and 1214 deletions
@ -1,60 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.service.storage; |
||||
|
||||
import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_STORAGE_TYPE; |
||||
import static org.apache.dolphinscheduler.common.constants.Constants.STORAGE_HDFS; |
||||
import static org.apache.dolphinscheduler.common.constants.Constants.STORAGE_OSS; |
||||
import static org.apache.dolphinscheduler.common.constants.Constants.STORAGE_S3; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.PropertyUtils; |
||||
import org.apache.dolphinscheduler.service.storage.impl.HadoopUtils; |
||||
import org.apache.dolphinscheduler.service.storage.impl.OssOperator; |
||||
import org.apache.dolphinscheduler.service.storage.impl.S3Utils; |
||||
|
||||
import org.springframework.context.annotation.Bean; |
||||
import org.springframework.context.annotation.Configuration; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
/** |
||||
* choose the impl of storage by RESOURCE_STORAGE_TYPE |
||||
*/ |
||||
|
||||
@Component |
||||
@Configuration |
||||
public class StoreConfiguration { |
||||
|
||||
@Bean |
||||
public StorageOperate storageOperate() { |
||||
switch (PropertyUtils.getUpperCaseString(RESOURCE_STORAGE_TYPE)) { |
||||
case STORAGE_OSS: |
||||
OssOperator ossOperator = new OssOperator(); |
||||
// TODO: change to use ossOperator.init(ossConnection) after DS supports Configuration / Connection
|
||||
// Center
|
||||
ossOperator.init(); |
||||
return ossOperator; |
||||
case STORAGE_S3: |
||||
return S3Utils.getInstance(); |
||||
case STORAGE_HDFS: |
||||
return HadoopUtils.getInstance(); |
||||
default: |
||||
return null; |
||||
} |
||||
} |
||||
|
||||
} |
@ -1,56 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.service.log; |
||||
|
||||
import org.apache.dolphinscheduler.common.constants.Constants; |
||||
|
||||
import org.junit.jupiter.api.Assertions; |
||||
import org.junit.jupiter.api.Test; |
||||
|
||||
import ch.qos.logback.classic.Level; |
||||
import ch.qos.logback.classic.spi.LoggingEvent; |
||||
import ch.qos.logback.core.spi.FilterReply; |
||||
|
||||
public class MasterLogFilterTest { |
||||
|
||||
@Test |
||||
public void decide() { |
||||
MasterLogFilter masterLogFilter = new MasterLogFilter(); |
||||
|
||||
FilterReply filterReply = masterLogFilter.decide(new LoggingEvent() { |
||||
|
||||
@Override |
||||
public String getThreadName() { |
||||
return Constants.THREAD_NAME_MASTER_SERVER; |
||||
} |
||||
|
||||
@Override |
||||
public Level getLevel() { |
||||
return Level.INFO; |
||||
} |
||||
|
||||
@Override |
||||
public String getMessage() { |
||||
return "master insert into queue success, task : shell2"; |
||||
} |
||||
|
||||
}); |
||||
|
||||
Assertions.assertEquals(FilterReply.ACCEPT, filterReply); |
||||
|
||||
} |
||||
} |
@ -1,66 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.service.log; |
||||
|
||||
import org.apache.dolphinscheduler.common.constants.Constants; |
||||
|
||||
import org.junit.jupiter.api.Assertions; |
||||
import org.junit.jupiter.api.Test; |
||||
|
||||
import ch.qos.logback.classic.Level; |
||||
import ch.qos.logback.classic.spi.LoggingEvent; |
||||
import ch.qos.logback.core.spi.FilterReply; |
||||
|
||||
public class WorkerLogFilterTest { |
||||
|
||||
@Test |
||||
public void decide() { |
||||
WorkerLogFilter workerLogFilter = new WorkerLogFilter(); |
||||
|
||||
FilterReply filterReply = workerLogFilter.decide(new LoggingEvent() { |
||||
|
||||
@Override |
||||
public String getThreadName() { |
||||
return Constants.THREAD_NAME_WORKER_SERVER; |
||||
} |
||||
|
||||
@Override |
||||
public Level getLevel() { |
||||
return Level.INFO; |
||||
} |
||||
|
||||
@Override |
||||
public String getMessage() { |
||||
return "consume tasks: [2_177_2_704_-1],there still have 0 tasks need to be executed"; |
||||
} |
||||
|
||||
@Override |
||||
public Object[] getArgumentArray() { |
||||
return new Object[0]; |
||||
} |
||||
|
||||
@Override |
||||
public String getFormattedMessage() { |
||||
return "consume tasks: [2_177_2_704_-1],there still have 0 tasks need to be executed"; |
||||
} |
||||
|
||||
}); |
||||
|
||||
Assertions.assertEquals(FilterReply.ACCEPT, filterReply); |
||||
|
||||
} |
||||
} |
@ -0,0 +1,52 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
||||
<modelVersion>4.0.0</modelVersion> |
||||
<parent> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-storage-plugin</artifactId> |
||||
<version>dev-SNAPSHOT</version> |
||||
</parent> |
||||
|
||||
<artifactId>dolphinscheduler-storage-all</artifactId> |
||||
|
||||
<dependencies> |
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-storage-api</artifactId> |
||||
<version>${project.version}</version> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-storage-s3</artifactId> |
||||
<version>${project.version}</version> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-storage-hdfs</artifactId> |
||||
<version>${project.version}</version> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-storage-oss</artifactId> |
||||
<version>${project.version}</version> |
||||
</dependency> |
||||
</dependencies> |
||||
|
||||
</project> |
@ -0,0 +1,36 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
||||
<modelVersion>4.0.0</modelVersion> |
||||
<parent> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-storage-plugin</artifactId> |
||||
<version>dev-SNAPSHOT</version> |
||||
</parent> |
||||
|
||||
<artifactId>dolphinscheduler-storage-api</artifactId> |
||||
|
||||
<dependencies> |
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-spi</artifactId> |
||||
<version>${project.version}</version> |
||||
</dependency> |
||||
</dependencies> |
||||
</project> |
@ -0,0 +1,50 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.storage.api; |
||||
|
||||
import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_STORAGE_TYPE; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.PropertyUtils; |
||||
|
||||
import java.util.Optional; |
||||
import java.util.ServiceLoader; |
||||
|
||||
import org.springframework.context.annotation.Bean; |
||||
import org.springframework.context.annotation.Configuration; |
||||
|
||||
// todo: If we move the config to yaml
|
||||
@Configuration |
||||
public class StorageConfiguration { |
||||
|
||||
@Bean |
||||
public StorageOperate storageOperate() { |
||||
Optional<StorageType> storageTypeOptional = |
||||
StorageType.getStorageType(PropertyUtils.getUpperCaseString(RESOURCE_STORAGE_TYPE)); |
||||
Optional<StorageOperate> storageOperate = storageTypeOptional.map(storageType -> { |
||||
ServiceLoader<StorageOperateFactory> storageOperateFactories = |
||||
ServiceLoader.load(StorageOperateFactory.class); |
||||
for (StorageOperateFactory storageOperateFactory : storageOperateFactories) { |
||||
if (storageOperateFactory.getStorageOperate() == storageType) { |
||||
return storageOperateFactory.createStorageOperate(); |
||||
} |
||||
} |
||||
return null; |
||||
}); |
||||
return storageOperate.orElse(null); |
||||
} |
||||
} |
@ -0,0 +1,53 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.storage.api; |
||||
|
||||
import java.util.Optional; |
||||
|
||||
public enum StorageType { |
||||
|
||||
HDFS(0, "HDFS"), |
||||
OSS(1, "OSS"), |
||||
S3(2, "S3"), |
||||
; |
||||
|
||||
private final int code; |
||||
private final String name; |
||||
|
||||
StorageType(int code, String name) { |
||||
this.code = code; |
||||
this.name = name; |
||||
} |
||||
|
||||
public int getCode() { |
||||
return code; |
||||
} |
||||
|
||||
public String getName() { |
||||
return name; |
||||
} |
||||
|
||||
public static Optional<StorageType> getStorageType(String name) { |
||||
for (StorageType storageType : StorageType.values()) { |
||||
if (storageType.getName().equals(name)) { |
||||
return Optional.of(storageType); |
||||
} |
||||
} |
||||
return Optional.empty(); |
||||
} |
||||
} |
@ -0,0 +1,232 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
||||
<modelVersion>4.0.0</modelVersion> |
||||
<parent> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-storage-plugin</artifactId> |
||||
<version>dev-SNAPSHOT</version> |
||||
</parent> |
||||
|
||||
<artifactId>dolphinscheduler-storage-hdfs</artifactId> |
||||
|
||||
<dependencies> |
||||
<!-- This is used to load Kerberos Conf --> |
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-datasource-api</artifactId> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-storage-api</artifactId> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>org.apache.hadoop</groupId> |
||||
<artifactId>hadoop-common</artifactId> |
||||
<scope>provided</scope> |
||||
<exclusions> |
||||
<exclusion> |
||||
<groupId>org.slf4j</groupId> |
||||
<artifactId>slf4j-log4j12</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>jdk.tools</groupId> |
||||
<artifactId>jdk.tools</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>javax.servlet</groupId> |
||||
<artifactId>servlet-api</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>javax.servlet</groupId> |
||||
<artifactId>servlet-api</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>log4j</groupId> |
||||
<artifactId>log4j</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>org.apache.curator</groupId> |
||||
<artifactId>curator-client</artifactId> |
||||
</exclusion> |
||||
|
||||
<exclusion> |
||||
<groupId>commons-configuration</groupId> |
||||
<artifactId>commons-configuration</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>io.grpc</groupId> |
||||
<artifactId>grpc-protobuf</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>io.netty</groupId> |
||||
<artifactId>netty</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>org.codehaus.jackson</groupId> |
||||
<artifactId>jackson-core-asl</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>org.codehaus.jackson</groupId> |
||||
<artifactId>jackson-mapper-asl</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>com.google.protobuf</groupId> |
||||
<artifactId>jackson-mapper-asl</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>com.google.code.gson</groupId> |
||||
<artifactId>gson</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>xmlenc</groupId> |
||||
<artifactId>xmlenc</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>commons-net</groupId> |
||||
<artifactId>commons-net</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>org.apache.avro</groupId> |
||||
<artifactId>avro</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>org.apache.zookeeper</groupId> |
||||
<artifactId>zookeeper</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>javax.servlet.jsp</groupId> |
||||
<artifactId>jsp-api</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>com.sun.jersey</groupId> |
||||
<artifactId>jersey-json</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>com.sun.jersey</groupId> |
||||
<artifactId>jersey-server</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>com.sun.jersey</groupId> |
||||
<artifactId>jersey-core</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>org.mortbay.jetty</groupId> |
||||
<artifactId>jetty</artifactId> |
||||
</exclusion> |
||||
</exclusions> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>org.apache.hadoop</groupId> |
||||
<artifactId>hadoop-client</artifactId> |
||||
<scope>provided</scope> |
||||
<exclusions> |
||||
<exclusion> |
||||
<groupId>org.slf4j</groupId> |
||||
<artifactId>slf4j-log4j12</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>javax.servlet</groupId> |
||||
<artifactId>servlet-api</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>org.codehaus.jackson</groupId> |
||||
<artifactId>jackson-jaxrs</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>org.codehaus.jackson</groupId> |
||||
<artifactId>jackson-xc</artifactId> |
||||
</exclusion> |
||||
|
||||
<exclusion> |
||||
<groupId>org.fusesource.leveldbjni</groupId> |
||||
<artifactId>leveldbjni-all</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>org.apache.zookeeper</groupId> |
||||
<artifactId>zookeeper</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>org.apache.hadoop</groupId> |
||||
<artifactId>hadoop-mapreduce-client-shuffle</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>com.sun.jersey</groupId> |
||||
<artifactId>jersey-client</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>com.sun.jersey</groupId> |
||||
<artifactId>jersey-core</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>javax.xml.bind</groupId> |
||||
<artifactId>jaxb-api</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>log4j</groupId> |
||||
<artifactId>log4j</artifactId> |
||||
</exclusion> |
||||
</exclusions> |
||||
</dependency> |
||||
|
||||
<dependency> |
||||
<groupId>org.apache.hadoop</groupId> |
||||
<artifactId>hadoop-hdfs</artifactId> |
||||
<scope>provided</scope> |
||||
<exclusions> |
||||
<exclusion> |
||||
<groupId>javax.servlet</groupId> |
||||
<artifactId>servlet-api</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>io.netty</groupId> |
||||
<artifactId>netty</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>com.google.protobuf</groupId> |
||||
<artifactId>protobuf-java</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>xmlenc</groupId> |
||||
<artifactId>xmlenc</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>io.netty</groupId> |
||||
<artifactId>netty-all</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>org.fusesource.leveldbjni</groupId> |
||||
<artifactId>leveldbjni-all</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>com.sun.jersey</groupId> |
||||
<artifactId>jersey-core</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>com.sun.jersey</groupId> |
||||
<artifactId>jersey-server</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>log4j</groupId> |
||||
<artifactId>log4j</artifactId> |
||||
</exclusion> |
||||
</exclusions> |
||||
</dependency> |
||||
</dependencies> |
||||
</project> |
@ -0,0 +1,42 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
||||
<modelVersion>4.0.0</modelVersion> |
||||
<parent> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-storage-plugin</artifactId> |
||||
<version>dev-SNAPSHOT</version> |
||||
</parent> |
||||
|
||||
<artifactId>dolphinscheduler-storage-oss</artifactId> |
||||
|
||||
<dependencies> |
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-storage-api</artifactId> |
||||
<version>${project.version}</version> |
||||
</dependency> |
||||
|
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-task-api</artifactId> |
||||
<version>${project.version}</version> |
||||
</dependency> |
||||
</dependencies> |
||||
</project> |
@ -0,0 +1,46 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
||||
<modelVersion>4.0.0</modelVersion> |
||||
<parent> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-storage-plugin</artifactId> |
||||
<version>dev-SNAPSHOT</version> |
||||
</parent> |
||||
|
||||
<artifactId>dolphinscheduler-storage-s3</artifactId> |
||||
|
||||
<dependencies> |
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-task-api</artifactId> |
||||
</dependency> |
||||
|
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-storage-api</artifactId> |
||||
<version>${project.version}</version> |
||||
</dependency> |
||||
|
||||
<dependency> |
||||
<groupId>com.amazonaws</groupId> |
||||
<artifactId>aws-java-sdk-s3</artifactId> |
||||
</dependency> |
||||
</dependencies> |
||||
</project> |
@ -0,0 +1,49 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
||||
<modelVersion>4.0.0</modelVersion> |
||||
<parent> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler</artifactId> |
||||
<version>dev-SNAPSHOT</version> |
||||
</parent> |
||||
|
||||
<artifactId>dolphinscheduler-storage-plugin</artifactId> |
||||
<packaging>pom</packaging> |
||||
|
||||
<modules> |
||||
<module>dolphinscheduler-storage-api</module> |
||||
<module>dolphinscheduler-storage-all</module> |
||||
<module>dolphinscheduler-storage-s3</module> |
||||
<module>dolphinscheduler-storage-hdfs</module> |
||||
<module>dolphinscheduler-storage-oss</module> |
||||
</modules> |
||||
|
||||
<dependencyManagement> |
||||
<dependencies> |
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-bom</artifactId> |
||||
<version>${project.version}</version> |
||||
<type>pom</type> |
||||
<scope>import</scope> |
||||
</dependency> |
||||
</dependencies> |
||||
</dependencyManagement> |
||||
</project> |
@ -1,106 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.api; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils; |
||||
|
||||
import org.apache.commons.lang3.SystemUtils; |
||||
|
||||
import java.util.regex.Matcher; |
||||
import java.util.regex.Pattern; |
||||
|
||||
import lombok.NonNull; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
public final class ProcessUtils { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ProcessUtils.class); |
||||
|
||||
private ProcessUtils() { |
||||
throw new IllegalStateException("Utility class"); |
||||
} |
||||
|
||||
/** |
||||
* Initialization regularization, solve the problem of pre-compilation performance, |
||||
* avoid the thread safety problem of multi-thread operation |
||||
*/ |
||||
private static final Pattern MACPATTERN = Pattern.compile("-[+|-]-\\s(\\d+)"); |
||||
|
||||
/** |
||||
* Expression of PID recognition in Windows scene |
||||
*/ |
||||
private static final Pattern WINDOWSATTERN = Pattern.compile("(\\d+)"); |
||||
|
||||
/** |
||||
* kill tasks according to different task types. |
||||
*/ |
||||
public static boolean kill(@NonNull TaskExecutionContext request) { |
||||
try { |
||||
logger.info("Begin kill task instance, processId: {}", request.getProcessId()); |
||||
int processId = request.getProcessId(); |
||||
if (processId == 0) { |
||||
logger.error("Task instance kill failed, processId is not exist"); |
||||
return false; |
||||
} |
||||
|
||||
String cmd = String.format("kill -9 %s", getPidsStr(processId)); |
||||
cmd = OSUtils.getSudoCmd(request.getTenantCode(), cmd); |
||||
logger.info("process id:{}, cmd:{}", processId, cmd); |
||||
|
||||
OSUtils.exeCmd(cmd); |
||||
logger.info("Success kill task instance, processId: {}", request.getProcessId()); |
||||
return true; |
||||
} catch (Exception e) { |
||||
logger.error("Kill task instance error, processId: {}", request.getProcessId(), e); |
||||
return false; |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* get pids str. |
||||
* |
||||
* @param processId process id |
||||
* @return pids pid String |
||||
* @throws Exception exception |
||||
*/ |
||||
public static String getPidsStr(int processId) throws Exception { |
||||
StringBuilder sb = new StringBuilder(); |
||||
Matcher mat = null; |
||||
// pstree pid get sub pids
|
||||
if (SystemUtils.IS_OS_MAC) { |
||||
String pids = OSUtils.exeCmd(String.format("%s -sp %d", TaskConstants.PSTREE, processId)); |
||||
if (null != pids) { |
||||
mat = MACPATTERN.matcher(pids); |
||||
} |
||||
} else { |
||||
String pids = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId)); |
||||
mat = WINDOWSATTERN.matcher(pids); |
||||
} |
||||
|
||||
if (null != mat) { |
||||
while (mat.find()) { |
||||
sb.append(mat.group(1)).append(" "); |
||||
} |
||||
} |
||||
|
||||
return sb.toString().trim(); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,402 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.api.utils; |
||||
|
||||
import org.apache.dolphinscheduler.common.constants.Constants; |
||||
import org.apache.dolphinscheduler.common.exception.BaseException; |
||||
import org.apache.dolphinscheduler.common.utils.HttpUtils; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.common.utils.KerberosHttpClient; |
||||
import org.apache.dolphinscheduler.common.utils.PropertyUtils; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; |
||||
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; |
||||
|
||||
import org.apache.commons.lang3.StringUtils; |
||||
import org.apache.commons.lang3.SystemUtils; |
||||
|
||||
import java.io.File; |
||||
import java.nio.charset.StandardCharsets; |
||||
import java.util.List; |
||||
import java.util.regex.Matcher; |
||||
import java.util.regex.Pattern; |
||||
|
||||
import lombok.NonNull; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode; |
||||
|
||||
public final class ProcessUtils { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ProcessUtils.class); |
||||
|
||||
private ProcessUtils() { |
||||
throw new IllegalStateException("Utility class"); |
||||
} |
||||
|
||||
/** |
||||
* Initialization regularization, solve the problem of pre-compilation performance, |
||||
* avoid the thread safety problem of multi-thread operation |
||||
*/ |
||||
private static final Pattern MACPATTERN = Pattern.compile("-[+|-]-\\s(\\d+)"); |
||||
|
||||
/** |
||||
* Expression of PID recognition in Windows scene |
||||
*/ |
||||
private static final Pattern WINDOWSATTERN = Pattern.compile("(\\d+)"); |
||||
|
||||
private static final String RM_HA_IDS = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS); |
||||
private static final String APP_ADDRESS = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS); |
||||
private static final String JOB_HISTORY_ADDRESS = |
||||
PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS); |
||||
private static final int HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE = |
||||
PropertyUtils.getInt(Constants.HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT, 8088); |
||||
|
||||
/** |
||||
* kill tasks according to different task types. |
||||
*/ |
||||
public static boolean kill(@NonNull TaskExecutionContext request) { |
||||
try { |
||||
logger.info("Begin kill task instance, processId: {}", request.getProcessId()); |
||||
int processId = request.getProcessId(); |
||||
if (processId == 0) { |
||||
logger.error("Task instance kill failed, processId is not exist"); |
||||
return false; |
||||
} |
||||
|
||||
String cmd = String.format("kill -9 %s", getPidsStr(processId)); |
||||
cmd = OSUtils.getSudoCmd(request.getTenantCode(), cmd); |
||||
logger.info("process id:{}, cmd:{}", processId, cmd); |
||||
|
||||
OSUtils.exeCmd(cmd); |
||||
logger.info("Success kill task instance, processId: {}", request.getProcessId()); |
||||
return true; |
||||
} catch (Exception e) { |
||||
logger.error("Kill task instance error, processId: {}", request.getProcessId(), e); |
||||
return false; |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* get pids str. |
||||
* |
||||
* @param processId process id |
||||
* @return pids pid String |
||||
* @throws Exception exception |
||||
*/ |
||||
public static String getPidsStr(int processId) throws Exception { |
||||
StringBuilder sb = new StringBuilder(); |
||||
Matcher mat = null; |
||||
// pstree pid get sub pids
|
||||
if (SystemUtils.IS_OS_MAC) { |
||||
String pids = OSUtils.exeCmd(String.format("%s -sp %d", TaskConstants.PSTREE, processId)); |
||||
if (null != pids) { |
||||
mat = MACPATTERN.matcher(pids); |
||||
} |
||||
} else { |
||||
String pids = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId)); |
||||
mat = WINDOWSATTERN.matcher(pids); |
||||
} |
||||
|
||||
if (null != mat) { |
||||
while (mat.find()) { |
||||
sb.append(mat.group(1)).append(" "); |
||||
} |
||||
} |
||||
|
||||
return sb.toString().trim(); |
||||
} |
||||
|
||||
/** |
||||
* kill yarn application. |
||||
* |
||||
* @param appIds app id list |
||||
* @param logger logger |
||||
* @param tenantCode tenant code |
||||
* @param executePath execute path |
||||
*/ |
||||
public static void cancelApplication(List<String> appIds, Logger logger, String tenantCode, String executePath) { |
||||
if (appIds == null || appIds.isEmpty()) { |
||||
return; |
||||
} |
||||
|
||||
for (String appId : appIds) { |
||||
try { |
||||
TaskExecutionStatus applicationStatus = getApplicationStatus(appId); |
||||
|
||||
if (!applicationStatus.isFinished()) { |
||||
String commandFile = String.format("%s/%s.kill", executePath, appId); |
||||
String cmd = getKerberosInitCommand() + "yarn application -kill " + appId; |
||||
execYarnKillCommand(logger, tenantCode, appId, commandFile, cmd); |
||||
} |
||||
} catch (Exception e) { |
||||
logger.error("Get yarn application app id [{}}] status failed", appId, e); |
||||
} |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* get the state of an application |
||||
* |
||||
* @param applicationId application id |
||||
* @return the return may be null or there may be other parse exceptions |
||||
*/ |
||||
public static TaskExecutionStatus getApplicationStatus(String applicationId) throws BaseException { |
||||
if (StringUtils.isEmpty(applicationId)) { |
||||
return null; |
||||
} |
||||
|
||||
String result; |
||||
String applicationUrl = getApplicationUrl(applicationId); |
||||
logger.debug("generate yarn application url, applicationUrl={}", applicationUrl); |
||||
|
||||
String responseContent = Boolean.TRUE |
||||
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) |
||||
? KerberosHttpClient.get(applicationUrl) |
||||
: HttpUtils.get(applicationUrl); |
||||
if (responseContent != null) { |
||||
ObjectNode jsonObject = JSONUtils.parseObject(responseContent); |
||||
if (!jsonObject.has("app")) { |
||||
return TaskExecutionStatus.FAILURE; |
||||
} |
||||
result = jsonObject.path("app").path("finalStatus").asText(); |
||||
|
||||
} else { |
||||
// may be in job history
|
||||
String jobHistoryUrl = getJobHistoryUrl(applicationId); |
||||
logger.debug("generate yarn job history application url, jobHistoryUrl={}", jobHistoryUrl); |
||||
responseContent = Boolean.TRUE |
||||
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) |
||||
? KerberosHttpClient.get(jobHistoryUrl) |
||||
: HttpUtils.get(jobHistoryUrl); |
||||
|
||||
if (null != responseContent) { |
||||
ObjectNode jsonObject = JSONUtils.parseObject(responseContent); |
||||
if (!jsonObject.has("job")) { |
||||
return TaskExecutionStatus.FAILURE; |
||||
} |
||||
result = jsonObject.path("job").path("state").asText(); |
||||
} else { |
||||
return TaskExecutionStatus.FAILURE; |
||||
} |
||||
} |
||||
|
||||
return getExecutionStatus(result); |
||||
} |
||||
|
||||
/** |
||||
* get application url |
||||
* if rmHaIds contains xx, it signs not use resourcemanager |
||||
* otherwise: |
||||
* if rmHaIds is empty, single resourcemanager enabled |
||||
* if rmHaIds not empty: resourcemanager HA enabled |
||||
* |
||||
* @param applicationId application id |
||||
* @return url of application |
||||
*/ |
||||
private static String getApplicationUrl(String applicationId) throws BaseException { |
||||
|
||||
String appUrl = StringUtils.isEmpty(RM_HA_IDS) ? APP_ADDRESS : getAppAddress(APP_ADDRESS, RM_HA_IDS); |
||||
if (StringUtils.isBlank(appUrl)) { |
||||
throw new BaseException("yarn application url generation failed"); |
||||
} |
||||
logger.debug("yarn application url:{}, applicationId:{}", appUrl, applicationId); |
||||
return String.format(appUrl, HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE, applicationId); |
||||
} |
||||
|
||||
private static String getJobHistoryUrl(String applicationId) { |
||||
// eg:application_1587475402360_712719 -> job_1587475402360_712719
|
||||
String jobId = applicationId.replace("application", "job"); |
||||
return String.format(JOB_HISTORY_ADDRESS, jobId); |
||||
} |
||||
|
||||
/** |
||||
* build kill command for yarn application |
||||
* |
||||
* @param logger logger |
||||
* @param tenantCode tenant code |
||||
* @param appId app id |
||||
* @param commandFile command file |
||||
* @param cmd cmd |
||||
*/ |
||||
private static void execYarnKillCommand(Logger logger, String tenantCode, String appId, String commandFile, |
||||
String cmd) { |
||||
try { |
||||
StringBuilder sb = new StringBuilder(); |
||||
sb.append("#!/bin/sh\n"); |
||||
sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n"); |
||||
sb.append("cd $BASEDIR\n"); |
||||
|
||||
sb.append("\n\n"); |
||||
sb.append(cmd); |
||||
|
||||
File f = new File(commandFile); |
||||
|
||||
if (!f.exists()) { |
||||
org.apache.commons.io.FileUtils.writeStringToFile(new File(commandFile), sb.toString(), |
||||
StandardCharsets.UTF_8); |
||||
} |
||||
|
||||
String runCmd = String.format("%s %s", Constants.SH, commandFile); |
||||
runCmd = org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd); |
||||
logger.info("kill cmd:{}", runCmd); |
||||
org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd); |
||||
} catch (Exception e) { |
||||
logger.error(String.format("Kill yarn application app id [%s] failed: [%s]", appId, e.getMessage())); |
||||
} |
||||
} |
||||
|
||||
private static TaskExecutionStatus getExecutionStatus(String result) { |
||||
switch (result) { |
||||
case Constants.ACCEPTED: |
||||
return TaskExecutionStatus.SUBMITTED_SUCCESS; |
||||
case Constants.SUCCEEDED: |
||||
case Constants.ENDED: |
||||
return TaskExecutionStatus.SUCCESS; |
||||
case Constants.NEW: |
||||
case Constants.NEW_SAVING: |
||||
case Constants.SUBMITTED: |
||||
case Constants.FAILED: |
||||
return TaskExecutionStatus.FAILURE; |
||||
case Constants.KILLED: |
||||
return TaskExecutionStatus.KILL; |
||||
case Constants.RUNNING: |
||||
default: |
||||
return TaskExecutionStatus.RUNNING_EXECUTION; |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* getAppAddress |
||||
* |
||||
* @param appAddress app address |
||||
* @param rmHa resource manager ha |
||||
* @return app address |
||||
*/ |
||||
private static String getAppAddress(String appAddress, String rmHa) { |
||||
|
||||
String[] split1 = appAddress.split(Constants.DOUBLE_SLASH); |
||||
|
||||
if (split1.length != 2) { |
||||
return null; |
||||
} |
||||
|
||||
String start = split1[0] + Constants.DOUBLE_SLASH; |
||||
String[] split2 = split1[1].split(Constants.COLON); |
||||
|
||||
if (split2.length != 2) { |
||||
return null; |
||||
} |
||||
|
||||
String end = Constants.COLON + split2[1]; |
||||
|
||||
// get active ResourceManager
|
||||
String activeRM = YarnHAAdminUtils.getActiveRMName(start, rmHa); |
||||
|
||||
if (StringUtils.isEmpty(activeRM)) { |
||||
return null; |
||||
} |
||||
|
||||
return start + activeRM + end; |
||||
} |
||||
|
||||
/** |
||||
* get kerberos init command |
||||
*/ |
||||
private static String getKerberosInitCommand() { |
||||
logger.info("get kerberos init command"); |
||||
StringBuilder kerberosCommandBuilder = new StringBuilder(); |
||||
boolean hadoopKerberosState = |
||||
PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false); |
||||
if (hadoopKerberosState) { |
||||
kerberosCommandBuilder.append("export KRB5_CONFIG=") |
||||
.append(PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH)) |
||||
.append("\n\n") |
||||
.append(String.format("kinit -k -t %s %s || true", |
||||
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH), |
||||
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME))) |
||||
.append("\n\n"); |
||||
logger.info("kerberos init command: {}", kerberosCommandBuilder); |
||||
} |
||||
return kerberosCommandBuilder.toString(); |
||||
} |
||||
|
||||
/** |
||||
* yarn ha admin utils |
||||
*/ |
||||
private static final class YarnHAAdminUtils { |
||||
|
||||
/** |
||||
* get active resourcemanager node |
||||
* |
||||
* @param protocol http protocol |
||||
* @param rmIds yarn ha ids |
||||
* @return yarn active node |
||||
*/ |
||||
public static String getActiveRMName(String protocol, String rmIds) { |
||||
|
||||
String[] rmIdArr = rmIds.split(Constants.COMMA); |
||||
|
||||
String yarnUrl = protocol + "%s:" + HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE + "/ws/v1/cluster/info"; |
||||
|
||||
try { |
||||
|
||||
/** |
||||
* send http get request to rm |
||||
*/ |
||||
|
||||
for (String rmId : rmIdArr) { |
||||
String state = getRMState(String.format(yarnUrl, rmId)); |
||||
if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) { |
||||
return rmId; |
||||
} |
||||
} |
||||
|
||||
} catch (Exception e) { |
||||
logger.error("yarn ha application url generation failed, message:{}", e.getMessage()); |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
/** |
||||
* get ResourceManager state |
||||
*/ |
||||
public static String getRMState(String url) { |
||||
|
||||
String retStr = Boolean.TRUE |
||||
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) |
||||
? KerberosHttpClient.get(url) |
||||
: HttpUtils.get(url); |
||||
|
||||
if (StringUtils.isEmpty(retStr)) { |
||||
return null; |
||||
} |
||||
// to json
|
||||
ObjectNode jsonObject = JSONUtils.parseObject(retStr); |
||||
|
||||
// get ResourceManager state
|
||||
if (!jsonObject.has("clusterInfo")) { |
||||
return null; |
||||
} |
||||
return jsonObject.get("clusterInfo").path("haState").asText(); |
||||
} |
||||
} |
||||
|
||||
} |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue