From d8e82b4ae68cde48e1118995d58d4b5967995ebc Mon Sep 17 00:00:00 2001 From: kezhenxu94 Date: Sun, 5 Sep 2021 18:23:27 +0800 Subject: [PATCH] Support starting standalone server in Docker image (#6102) Also remove unused class --- .../supervisor/supervisor.ini | 15 ++ docker/build/startup.sh | 17 +- .../server/master/future/TaskFuture.java | 175 ------------------ 3 files changed, 26 insertions(+), 181 deletions(-) delete mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java diff --git a/docker/build/conf/dolphinscheduler/supervisor/supervisor.ini b/docker/build/conf/dolphinscheduler/supervisor/supervisor.ini index c8c4e126c2..19166f48d9 100644 --- a/docker/build/conf/dolphinscheduler/supervisor/supervisor.ini +++ b/docker/build/conf/dolphinscheduler/supervisor/supervisor.ini @@ -90,3 +90,18 @@ killasgroup=true redirect_stderr=true stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 + +[program:standalone] +command=%(ENV_DOLPHINSCHEDULER_BIN)s/dolphinscheduler-daemon.sh start standalone-server +directory=%(ENV_DOLPHINSCHEDULER_HOME)s +priority=999 +autostart=%(ENV_STANDALONE_START_ENABLED)s +autorestart=true +startsecs=5 +stopwaitsecs=3 +exitcodes=0 +stopasgroup=true +killasgroup=true +redirect_stderr=true +stdout_logfile=/dev/fd/1 +stdout_logfile_maxbytes=0 diff --git a/docker/build/startup.sh b/docker/build/startup.sh index ae1ed36776..7f3b7d0d20 100755 --- a/docker/build/startup.sh +++ b/docker/build/startup.sh @@ -24,6 +24,7 @@ export WORKER_START_ENABLED=false export API_START_ENABLED=false export ALERT_START_ENABLED=false export LOGGER_START_ENABLED=false +export STANDALONE_START_ENABLED=false # wait database waitDatabase() { @@ -67,12 +68,13 @@ waitZK() { printUsage() { echo -e "Dolphin Scheduler is a distributed and easy-to-expand visual DAG workflow scheduling system," echo -e "dedicated to solving the complex dependencies in data processing, making the scheduling system out of the box for data processing.\n" - echo -e "Usage: [ all | master-server | worker-server | api-server | alert-server ]\n" - printf "%-13s: %s\n" "all" "Run master-server, worker-server, api-server and alert-server" - printf "%-13s: %s\n" "master-server" "MasterServer is mainly responsible for DAG task split, task submission monitoring." - printf "%-13s: %s\n" "worker-server" "WorkerServer is mainly responsible for task execution and providing log services." - printf "%-13s: %s\n" "api-server" "ApiServer is mainly responsible for processing requests and providing the front-end UI layer." - printf "%-13s: %s\n" "alert-server" "AlertServer mainly include Alarms." + echo -e "Usage: [ all | master-server | worker-server | api-server | alert-server | standalone-server ]\n" + printf "%-13s: %s\n" "all" "Run master-server, worker-server, api-server and alert-server" + printf "%-13s: %s\n" "master-server" "MasterServer is mainly responsible for DAG task split, task submission monitoring." + printf "%-13s: %s\n" "worker-server" "WorkerServer is mainly responsible for task execution and providing log services." + printf "%-13s: %s\n" "api-server" "ApiServer is mainly responsible for processing requests and providing the front-end UI layer." + printf "%-13s: %s\n" "alert-server" "AlertServer mainly include Alarms." + printf "%-13s: %s\n" "standalone-server" "Standalone server that uses embedded zookeeper and database, only for testing and demostration." } # init config file @@ -110,6 +112,9 @@ case "$1" in waitDatabase export ALERT_START_ENABLED=true ;; + (standalone-server) + export STANDALONE_START_ENABLED=true + ;; (help) printUsage exit 1 diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java deleted file mode 100644 index bab4acc23e..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.server.master.future; - - -import org.apache.dolphinscheduler.remote.command.Command; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -/** - * task future - */ -public class TaskFuture { - - private final static Logger LOGGER = LoggerFactory.getLogger(TaskFuture.class); - - private final static ConcurrentHashMap FUTURE_TABLE = new ConcurrentHashMap<>(256); - - /** - * request unique identification - */ - private final long opaque; - - /** - * timeout - */ - private final long timeoutMillis; - - private final CountDownLatch latch = new CountDownLatch(1); - - private final long beginTimestamp = System.currentTimeMillis(); - - /** - * response command - */ - private AtomicReference responseCommandReference = new AtomicReference<>(); - - private volatile boolean sendOk = true; - - private AtomicReference causeReference; - - public TaskFuture(long opaque, long timeoutMillis) { - this.opaque = opaque; - this.timeoutMillis = timeoutMillis; - FUTURE_TABLE.put(opaque, this); - } - - /** - * wait for response - * @return command - * @throws InterruptedException if error throws InterruptedException - */ - public Command waitResponse() throws InterruptedException { - this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS); - return this.responseCommandReference.get(); - } - - /** - * put response - * - * @param responseCommand responseCommand - */ - public void putResponse(final Command responseCommand) { - responseCommandReference.set(responseCommand); - this.latch.countDown(); - FUTURE_TABLE.remove(opaque); - } - - /** - * whether timeout - * @return timeout - */ - public boolean isTimeout() { - long diff = System.currentTimeMillis() - this.beginTimestamp; - return diff > this.timeoutMillis; - } - - public static void notify(final Command responseCommand){ - TaskFuture taskFuture = FUTURE_TABLE.remove(responseCommand.getOpaque()); - if(taskFuture != null){ - taskFuture.putResponse(responseCommand); - } - } - - - public boolean isSendOK() { - return sendOk; - } - - public void setSendOk(boolean sendOk) { - this.sendOk = sendOk; - } - - public void setCause(Throwable cause) { - causeReference.set(cause); - } - - public Throwable getCause() { - return causeReference.get(); - } - - public long getOpaque() { - return opaque; - } - - public long getTimeoutMillis() { - return timeoutMillis; - } - - public long getBeginTimestamp() { - return beginTimestamp; - } - - public Command getResponseCommand() { - return responseCommandReference.get(); - } - - public void setResponseCommand(Command responseCommand) { - responseCommandReference.set(responseCommand); - } - - - /** - * scan future table - */ - public static void scanFutureTable(){ - final List futureList = new LinkedList<>(); - Iterator> it = FUTURE_TABLE.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry next = it.next(); - TaskFuture future = next.getValue(); - if ((future.getBeginTimestamp() + future.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) { - futureList.add(future); - it.remove(); - LOGGER.warn("remove timeout request : {}", future); - } - } - } - - @Override - public String toString() { - return "TaskFuture{" + - "opaque=" + opaque + - ", timeoutMillis=" + timeoutMillis + - ", latch=" + latch + - ", beginTimestamp=" + beginTimestamp + - ", responseCommand=" + responseCommandReference.get() + - ", sendOk=" + sendOk + - ", cause=" + causeReference.get() + - '}'; - } -}