From 7d79a2165ee5f0bf1aa7c949c852da179d6c0cc9 Mon Sep 17 00:00:00 2001 From: JieguangZhou Date: Tue, 5 Jul 2022 11:05:20 +0800 Subject: [PATCH] [Optimization]Optimize some details of MLFlow task plugin #10740 (#10739) * Optimize some details of MLFlow task plugin * Update dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowParameters.java * fix some nips Co-authored-by: Jiajie Zhong --- .../dolphinscheduler-task-mlflow/pom.xml | 5 +++ .../plugin/task/mlflow/MlflowConstants.java | 14 ++++--- .../plugin/task/mlflow/MlflowParameters.java | 4 ++ .../plugin/task/mlflow/MlflowTask.java | 41 ++++++++++++++++++- .../plugin/task/mlflow/MlflowTaskTest.java | 36 +++++++--------- 5 files changed, 71 insertions(+), 29 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/pom.xml index f6691e5166..7e21ccab77 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/pom.xml +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/pom.xml @@ -34,6 +34,11 @@ dolphinscheduler-spi provided + + org.apache.dolphinscheduler + dolphinscheduler-common + provided + org.apache.dolphinscheduler dolphinscheduler-task-api diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowConstants.java index 4b99cfa7cd..8712081ad7 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowConstants.java @@ -90,7 +90,9 @@ public class MlflowConstants { public static final String DOCKER_RREMOVE_CONTAINER = "docker rm -f %s"; - public static final String DOCKER_RUN = "docker run --name=%s -p=%s:8080 %s"; + public static final String DOCKER_RUN = "docker run -d --name=%s -p=%s:8080 " + + "--health-cmd \"curl --fail http://127.0.0.1:8080/ping || exit 1\" --health-interval 5s --health-retries 20" + + " %s"; public static final String DOCKER_COMPOSE_RUN = "docker-compose up -d"; @@ -100,10 +102,10 @@ public class MlflowConstants { "export DS_TASK_MLFLOW_CPU_LIMIT=%s\n" + "export DS_TASK_MLFLOW_MEMORY_LIMIT=%s"; - public static final String DOCKER_HEALTH_CHECK_COMMAND = "for i in $(seq 1 300); " + - "do " + - "[ $(docker inspect --format \"{{json .State.Health.Status }}\" %s) = '\"healthy\"' ] " + - "&& exit 0 && break;sleep 1; " + - "done; docker-compose down; exit 1"; + public static final String DOCKER_HEALTH_CHECK = "docker inspect --format \"{{json .State.Health.Status }}\" %s"; + + public static final int DOCKER_HEALTH_CHECK_TIMEOUT = 20; + + public static final int DOCKER_HEALTH_CHECK_INTERVAL = 5000; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowParameters.java index a49fd051ed..4e47c8ae64 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowParameters.java @@ -286,4 +286,8 @@ public class MlflowParameters extends AbstractParameters { return containerName; } + public boolean getIsDeployDocker(){ + return deployType.equals(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER) || deployType.equals(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER_COMPOSE); + } + }; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java index e1e13d17c5..88da235ff0 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java @@ -27,8 +27,10 @@ import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import java.util.ArrayList; import java.util.HashMap; @@ -84,7 +86,13 @@ public class MlflowTask extends AbstractTaskExecutor { // construct process String command = buildCommand(); TaskResponse commandExecuteResult = shellCommandExecutor.run(command); - setExitStatusCode(commandExecuteResult.getExitStatusCode()); + int exitCode = exitStatusCode; + if (mlflowParameters.getIsDeployDocker()){ + exitCode = checkDockerHealth(); + }else { + exitCode = getExitStatusCode(); + } + setExitStatusCode(exitCode); setAppIds(commandExecuteResult.getAppIds()); setProcessId(commandExecuteResult.getProcessId()); mlflowParameters.dealOutParam(shellCommandExecutor.getVarPool()); @@ -181,10 +189,12 @@ public class MlflowTask extends AbstractTaskExecutor { String templatePath = getTemplatePath(MlflowConstants.TEMPLATE_DOCKER_COMPOSE); args.add(String.format("cp %s %s", templatePath, taskExecutionContext.getExecutePath())); String imageName = "mlflow/" + mlflowParameters.getModelKeyName(":"); + String containerName = mlflowParameters.getContainerName(); + args.add(String.format(MlflowConstants.MLFLOW_BUILD_DOCKER, deployModelKey, imageName)); + args.add(String.format(MlflowConstants.DOCKER_RREMOVE_CONTAINER, containerName)); args.add(mlflowParameters.getDockerComposeEnvCommand()); args.add(MlflowConstants.DOCKER_COMPOSE_RUN); - args.add(String.format(MlflowConstants.DOCKER_HEALTH_CHECK_COMMAND, mlflowParameters.getContainerName())); } String command = ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParamUtils.convert(paramsMap)); @@ -197,6 +207,33 @@ public class MlflowTask extends AbstractTaskExecutor { } + public int checkDockerHealth() throws Exception { + logger.info("checking container healthy ... "); + int exitCode = -1; + String[] command = {"sh", "-c", String.format(MlflowConstants.DOCKER_HEALTH_CHECK, mlflowParameters.getContainerName())}; + for(int x = 0; x < MlflowConstants.DOCKER_HEALTH_CHECK_TIMEOUT; x = x+1) { + String status; + try { + status = OSUtils.exeShell(command).replace("\n", "").replace("\"", ""); + } catch (Exception e) { + status = String.format("error --- %s", e.getMessage()); + } + logger.info("container healthy status: {}", status); + + if (status.equals("healthy")) { + exitCode = 0; + logger.info("container is healthy"); + return exitCode; + }else { + logger.info("The health check has been running for {} seconds", x * MlflowConstants.DOCKER_HEALTH_CHECK_INTERVAL / 1000); + ThreadUtils.sleep(MlflowConstants.DOCKER_HEALTH_CHECK_INTERVAL); + } + } + + logger.info("health check fail"); + return exitCode; + } + @Override public AbstractParameters getParameters() { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java index 5a85abf3c0..f985666006 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java @@ -22,6 +22,7 @@ import java.util.UUID; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; +import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils; import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowConstants; import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowParameters; import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowTask; @@ -41,7 +42,6 @@ import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor; import org.apache.dolphinscheduler.spi.utils.JSONUtils; - @RunWith(PowerMockRunner.class) @PrepareForTest({ JSONUtils.class, @@ -129,9 +129,7 @@ public class MlflowTaskTest { MlflowTask mlflowTask = initTask(createModelDeplyMlflowParameters()); Assert.assertEquals(mlflowTask.buildCommand(), "export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n" + - "mlflow models serve -m runs:/a272ec279fc34a8995121ae04281585f/model " + - "--port 7000 " + - "-h 0.0.0.0"); + "mlflow models serve -m models:/model/1 --port 7000 -h 0.0.0.0"); } @Test @@ -139,12 +137,11 @@ public class MlflowTaskTest { MlflowTask mlflowTask = initTask(createModelDeplyDockerParameters()); Assert.assertEquals(mlflowTask.buildCommand(), "export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n" + - "mlflow models build-docker -m runs:/a272ec279fc34a8995121ae04281585f/model " + - "-n mlflow/a272ec279fc34a8995121ae04281585f:model " + - "--enable-mlserver\n" + - "docker rm -f ds-mlflow-a272ec279fc34a8995121ae04281585f-model\n" + - "docker run --name=ds-mlflow-a272ec279fc34a8995121ae04281585f-model " + - "-p=7000:8080 mlflow/a272ec279fc34a8995121ae04281585f:model"); + "mlflow models build-docker -m models:/model/1 -n mlflow/model:1 --enable-mlserver\n" + + "docker rm -f ds-mlflow-model-1\n" + + "docker run -d --name=ds-mlflow-model-1 -p=7000:8080 " + + "--health-cmd \"curl --fail http://127.0.0.1:8080/ping || exit 1\" --health-interval 5s --health-retries 20 " + + "mlflow/model:1"); } @Test @@ -154,16 +151,14 @@ public class MlflowTaskTest { "export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n" + "cp " + mlflowTask.getTemplatePath(MlflowConstants.TEMPLATE_DOCKER_COMPOSE) + " /tmp/dolphinscheduler_test\n" + - "mlflow models build-docker -m models:/22222/1 -n mlflow/22222:1 --enable-mlserver\n" + - "export DS_TASK_MLFLOW_IMAGE_NAME=mlflow/22222:1\n" + - "export DS_TASK_MLFLOW_CONTAINER_NAME=ds-mlflow-22222-1\n" + + "mlflow models build-docker -m models:/model/1 -n mlflow/model:1 --enable-mlserver\n" + + "docker rm -f ds-mlflow-model-1\n" + + "export DS_TASK_MLFLOW_IMAGE_NAME=mlflow/model:1\n" + + "export DS_TASK_MLFLOW_CONTAINER_NAME=ds-mlflow-model-1\n" + "export DS_TASK_MLFLOW_DEPLOY_PORT=7000\n" + "export DS_TASK_MLFLOW_CPU_LIMIT=0.5\n" + "export DS_TASK_MLFLOW_MEMORY_LIMIT=200m\n" + - "docker-compose up -d\n" + - "for i in $(seq 1 300); do " + - "[ $(docker inspect --format \"{{json .State.Health.Status }}\" ds-mlflow-22222-1) = '\"healthy\"' ] && exit 0 && break;sleep 1; " + - "done; docker-compose down; exit 1"); + "docker-compose up -d"); } private MlflowTask initTask(MlflowParameters mlflowParameters) { @@ -172,7 +167,6 @@ public class MlflowTaskTest { mlflowTask.init(); mlflowTask.getParameters().setVarPool(taskExecutionContext.getVarPool()); return mlflowTask; - } private MlflowParameters createBasicAlgorithmParameters() { @@ -218,7 +212,7 @@ public class MlflowTaskTest { mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_MODELS); mlflowParameters.setDeployType(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_MLFLOW); mlflowParameters.setMlflowTrackingUris("http://127.0.0.1:5000"); - mlflowParameters.setDeployModelKey("runs:/a272ec279fc34a8995121ae04281585f/model"); + mlflowParameters.setDeployModelKey("models:/model/1"); mlflowParameters.setDeployPort("7000"); return mlflowParameters; } @@ -228,7 +222,7 @@ public class MlflowTaskTest { mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_MODELS); mlflowParameters.setDeployType(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER); mlflowParameters.setMlflowTrackingUris("http://127.0.0.1:5000"); - mlflowParameters.setDeployModelKey("runs:/a272ec279fc34a8995121ae04281585f/model"); + mlflowParameters.setDeployModelKey("models:/model/1"); mlflowParameters.setDeployPort("7000"); return mlflowParameters; } @@ -238,7 +232,7 @@ public class MlflowTaskTest { mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_MODELS); mlflowParameters.setDeployType(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER_COMPOSE); mlflowParameters.setMlflowTrackingUris("http://127.0.0.1:5000"); - mlflowParameters.setDeployModelKey("models:/22222/1"); + mlflowParameters.setDeployModelKey("models:/model/1"); mlflowParameters.setDeployPort("7000"); mlflowParameters.setCpuLimit("0.5"); mlflowParameters.setMemoryLimit("200m");