Browse Source

[Optimization]Optimize some details of MLFlow task plugin #10740 (#10739)

* Optimize some details of MLFlow task plugin

* Update dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowParameters.java

* fix some nips

Co-authored-by: Jiajie Zhong <zhongjiajie955@gmail.com>
3.1.0-release
JieguangZhou 2 years ago committed by GitHub
parent
commit
7d79a2165e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/pom.xml
  2. 14
      dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowConstants.java
  3. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowParameters.java
  4. 41
      dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
  5. 36
      dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java

5
dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/pom.xml

@ -34,6 +34,11 @@
<artifactId>dolphinscheduler-spi</artifactId> <artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.dolphinscheduler</groupId> <groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-api</artifactId> <artifactId>dolphinscheduler-task-api</artifactId>

14
dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowConstants.java

@ -90,7 +90,9 @@ public class MlflowConstants {
public static final String DOCKER_RREMOVE_CONTAINER = "docker rm -f %s"; public static final String DOCKER_RREMOVE_CONTAINER = "docker rm -f %s";
public static final String DOCKER_RUN = "docker run --name=%s -p=%s:8080 %s"; public static final String DOCKER_RUN = "docker run -d --name=%s -p=%s:8080 " +
"--health-cmd \"curl --fail http://127.0.0.1:8080/ping || exit 1\" --health-interval 5s --health-retries 20" +
" %s";
public static final String DOCKER_COMPOSE_RUN = "docker-compose up -d"; public static final String DOCKER_COMPOSE_RUN = "docker-compose up -d";
@ -100,10 +102,10 @@ public class MlflowConstants {
"export DS_TASK_MLFLOW_CPU_LIMIT=%s\n" + "export DS_TASK_MLFLOW_CPU_LIMIT=%s\n" +
"export DS_TASK_MLFLOW_MEMORY_LIMIT=%s"; "export DS_TASK_MLFLOW_MEMORY_LIMIT=%s";
public static final String DOCKER_HEALTH_CHECK_COMMAND = "for i in $(seq 1 300); " +
"do " +
"[ $(docker inspect --format \"{{json .State.Health.Status }}\" %s) = '\"healthy\"' ] " +
"&& exit 0 && break;sleep 1; " +
"done; docker-compose down; exit 1";
public static final String DOCKER_HEALTH_CHECK = "docker inspect --format \"{{json .State.Health.Status }}\" %s";
public static final int DOCKER_HEALTH_CHECK_TIMEOUT = 20;
public static final int DOCKER_HEALTH_CHECK_INTERVAL = 5000;
} }

4
dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowParameters.java

@ -286,4 +286,8 @@ public class MlflowParameters extends AbstractParameters {
return containerName; return containerName;
} }
public boolean getIsDeployDocker(){
return deployType.equals(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER) || deployType.equals(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER_COMPOSE);
}
}; };

41
dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java

@ -27,8 +27,10 @@ import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -84,7 +86,13 @@ public class MlflowTask extends AbstractTaskExecutor {
// construct process // construct process
String command = buildCommand(); String command = buildCommand();
TaskResponse commandExecuteResult = shellCommandExecutor.run(command); TaskResponse commandExecuteResult = shellCommandExecutor.run(command);
setExitStatusCode(commandExecuteResult.getExitStatusCode()); int exitCode = exitStatusCode;
if (mlflowParameters.getIsDeployDocker()){
exitCode = checkDockerHealth();
}else {
exitCode = getExitStatusCode();
}
setExitStatusCode(exitCode);
setAppIds(commandExecuteResult.getAppIds()); setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId()); setProcessId(commandExecuteResult.getProcessId());
mlflowParameters.dealOutParam(shellCommandExecutor.getVarPool()); mlflowParameters.dealOutParam(shellCommandExecutor.getVarPool());
@ -181,10 +189,12 @@ public class MlflowTask extends AbstractTaskExecutor {
String templatePath = getTemplatePath(MlflowConstants.TEMPLATE_DOCKER_COMPOSE); String templatePath = getTemplatePath(MlflowConstants.TEMPLATE_DOCKER_COMPOSE);
args.add(String.format("cp %s %s", templatePath, taskExecutionContext.getExecutePath())); args.add(String.format("cp %s %s", templatePath, taskExecutionContext.getExecutePath()));
String imageName = "mlflow/" + mlflowParameters.getModelKeyName(":"); String imageName = "mlflow/" + mlflowParameters.getModelKeyName(":");
String containerName = mlflowParameters.getContainerName();
args.add(String.format(MlflowConstants.MLFLOW_BUILD_DOCKER, deployModelKey, imageName)); args.add(String.format(MlflowConstants.MLFLOW_BUILD_DOCKER, deployModelKey, imageName));
args.add(String.format(MlflowConstants.DOCKER_RREMOVE_CONTAINER, containerName));
args.add(mlflowParameters.getDockerComposeEnvCommand()); args.add(mlflowParameters.getDockerComposeEnvCommand());
args.add(MlflowConstants.DOCKER_COMPOSE_RUN); args.add(MlflowConstants.DOCKER_COMPOSE_RUN);
args.add(String.format(MlflowConstants.DOCKER_HEALTH_CHECK_COMMAND, mlflowParameters.getContainerName()));
} }
String command = ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParamUtils.convert(paramsMap)); String command = ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParamUtils.convert(paramsMap));
@ -197,6 +207,33 @@ public class MlflowTask extends AbstractTaskExecutor {
} }
public int checkDockerHealth() throws Exception {
logger.info("checking container healthy ... ");
int exitCode = -1;
String[] command = {"sh", "-c", String.format(MlflowConstants.DOCKER_HEALTH_CHECK, mlflowParameters.getContainerName())};
for(int x = 0; x < MlflowConstants.DOCKER_HEALTH_CHECK_TIMEOUT; x = x+1) {
String status;
try {
status = OSUtils.exeShell(command).replace("\n", "").replace("\"", "");
} catch (Exception e) {
status = String.format("error --- %s", e.getMessage());
}
logger.info("container healthy status: {}", status);
if (status.equals("healthy")) {
exitCode = 0;
logger.info("container is healthy");
return exitCode;
}else {
logger.info("The health check has been running for {} seconds", x * MlflowConstants.DOCKER_HEALTH_CHECK_INTERVAL / 1000);
ThreadUtils.sleep(MlflowConstants.DOCKER_HEALTH_CHECK_INTERVAL);
}
}
logger.info("health check fail");
return exitCode;
}
@Override @Override
public AbstractParameters getParameters() { public AbstractParameters getParameters() {

36
dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java

@ -22,6 +22,7 @@ import java.util.UUID;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowConstants; import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowConstants;
import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowParameters; import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowParameters;
import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowTask; import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowTask;
@ -41,7 +42,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor; import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils;
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PrepareForTest({ @PrepareForTest({
JSONUtils.class, JSONUtils.class,
@ -129,9 +129,7 @@ public class MlflowTaskTest {
MlflowTask mlflowTask = initTask(createModelDeplyMlflowParameters()); MlflowTask mlflowTask = initTask(createModelDeplyMlflowParameters());
Assert.assertEquals(mlflowTask.buildCommand(), Assert.assertEquals(mlflowTask.buildCommand(),
"export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n" + "export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n" +
"mlflow models serve -m runs:/a272ec279fc34a8995121ae04281585f/model " + "mlflow models serve -m models:/model/1 --port 7000 -h 0.0.0.0");
"--port 7000 " +
"-h 0.0.0.0");
} }
@Test @Test
@ -139,12 +137,11 @@ public class MlflowTaskTest {
MlflowTask mlflowTask = initTask(createModelDeplyDockerParameters()); MlflowTask mlflowTask = initTask(createModelDeplyDockerParameters());
Assert.assertEquals(mlflowTask.buildCommand(), Assert.assertEquals(mlflowTask.buildCommand(),
"export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n" + "export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n" +
"mlflow models build-docker -m runs:/a272ec279fc34a8995121ae04281585f/model " + "mlflow models build-docker -m models:/model/1 -n mlflow/model:1 --enable-mlserver\n" +
"-n mlflow/a272ec279fc34a8995121ae04281585f:model " + "docker rm -f ds-mlflow-model-1\n" +
"--enable-mlserver\n" + "docker run -d --name=ds-mlflow-model-1 -p=7000:8080 " +
"docker rm -f ds-mlflow-a272ec279fc34a8995121ae04281585f-model\n" + "--health-cmd \"curl --fail http://127.0.0.1:8080/ping || exit 1\" --health-interval 5s --health-retries 20 " +
"docker run --name=ds-mlflow-a272ec279fc34a8995121ae04281585f-model " + "mlflow/model:1");
"-p=7000:8080 mlflow/a272ec279fc34a8995121ae04281585f:model");
} }
@Test @Test
@ -154,16 +151,14 @@ public class MlflowTaskTest {
"export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n" + "export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n" +
"cp " + mlflowTask.getTemplatePath(MlflowConstants.TEMPLATE_DOCKER_COMPOSE) + "cp " + mlflowTask.getTemplatePath(MlflowConstants.TEMPLATE_DOCKER_COMPOSE) +
" /tmp/dolphinscheduler_test\n" + " /tmp/dolphinscheduler_test\n" +
"mlflow models build-docker -m models:/22222/1 -n mlflow/22222:1 --enable-mlserver\n" + "mlflow models build-docker -m models:/model/1 -n mlflow/model:1 --enable-mlserver\n" +
"export DS_TASK_MLFLOW_IMAGE_NAME=mlflow/22222:1\n" + "docker rm -f ds-mlflow-model-1\n" +
"export DS_TASK_MLFLOW_CONTAINER_NAME=ds-mlflow-22222-1\n" + "export DS_TASK_MLFLOW_IMAGE_NAME=mlflow/model:1\n" +
"export DS_TASK_MLFLOW_CONTAINER_NAME=ds-mlflow-model-1\n" +
"export DS_TASK_MLFLOW_DEPLOY_PORT=7000\n" + "export DS_TASK_MLFLOW_DEPLOY_PORT=7000\n" +
"export DS_TASK_MLFLOW_CPU_LIMIT=0.5\n" + "export DS_TASK_MLFLOW_CPU_LIMIT=0.5\n" +
"export DS_TASK_MLFLOW_MEMORY_LIMIT=200m\n" + "export DS_TASK_MLFLOW_MEMORY_LIMIT=200m\n" +
"docker-compose up -d\n" + "docker-compose up -d");
"for i in $(seq 1 300); do " +
"[ $(docker inspect --format \"{{json .State.Health.Status }}\" ds-mlflow-22222-1) = '\"healthy\"' ] && exit 0 && break;sleep 1; " +
"done; docker-compose down; exit 1");
} }
private MlflowTask initTask(MlflowParameters mlflowParameters) { private MlflowTask initTask(MlflowParameters mlflowParameters) {
@ -172,7 +167,6 @@ public class MlflowTaskTest {
mlflowTask.init(); mlflowTask.init();
mlflowTask.getParameters().setVarPool(taskExecutionContext.getVarPool()); mlflowTask.getParameters().setVarPool(taskExecutionContext.getVarPool());
return mlflowTask; return mlflowTask;
} }
private MlflowParameters createBasicAlgorithmParameters() { private MlflowParameters createBasicAlgorithmParameters() {
@ -218,7 +212,7 @@ public class MlflowTaskTest {
mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_MODELS); mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_MODELS);
mlflowParameters.setDeployType(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_MLFLOW); mlflowParameters.setDeployType(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_MLFLOW);
mlflowParameters.setMlflowTrackingUris("http://127.0.0.1:5000"); mlflowParameters.setMlflowTrackingUris("http://127.0.0.1:5000");
mlflowParameters.setDeployModelKey("runs:/a272ec279fc34a8995121ae04281585f/model"); mlflowParameters.setDeployModelKey("models:/model/1");
mlflowParameters.setDeployPort("7000"); mlflowParameters.setDeployPort("7000");
return mlflowParameters; return mlflowParameters;
} }
@ -228,7 +222,7 @@ public class MlflowTaskTest {
mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_MODELS); mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_MODELS);
mlflowParameters.setDeployType(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER); mlflowParameters.setDeployType(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER);
mlflowParameters.setMlflowTrackingUris("http://127.0.0.1:5000"); mlflowParameters.setMlflowTrackingUris("http://127.0.0.1:5000");
mlflowParameters.setDeployModelKey("runs:/a272ec279fc34a8995121ae04281585f/model"); mlflowParameters.setDeployModelKey("models:/model/1");
mlflowParameters.setDeployPort("7000"); mlflowParameters.setDeployPort("7000");
return mlflowParameters; return mlflowParameters;
} }
@ -238,7 +232,7 @@ public class MlflowTaskTest {
mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_MODELS); mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_MODELS);
mlflowParameters.setDeployType(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER_COMPOSE); mlflowParameters.setDeployType(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER_COMPOSE);
mlflowParameters.setMlflowTrackingUris("http://127.0.0.1:5000"); mlflowParameters.setMlflowTrackingUris("http://127.0.0.1:5000");
mlflowParameters.setDeployModelKey("models:/22222/1"); mlflowParameters.setDeployModelKey("models:/model/1");
mlflowParameters.setDeployPort("7000"); mlflowParameters.setDeployPort("7000");
mlflowParameters.setCpuLimit("0.5"); mlflowParameters.setCpuLimit("0.5");
mlflowParameters.setMemoryLimit("200m"); mlflowParameters.setMemoryLimit("200m");

Loading…
Cancel
Save