Browse Source

host add host:port format (#2070)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify

* worker remove db

* ShellTask modify

* master persistence processId and appIds

* master persistence processId and appIds

* master add kill task logic

* master add kill task logic

* master add kill task logic

* javadoc error modify

* remove chinese log

* executeDirectly method add Override

* remote module modify

* TaskKillResponseProcessor command type modify

* create buildKillCommand

* host add host:port format

* host add host:port format
pull/2/head
qiaozhanwei 4 years ago committed by GitHub
parent
commit
72e8f8d195
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
  2. 10
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  3. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
  4. 139
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
  5. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
  6. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
  7. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -68,7 +69,7 @@ public class LoggerService {
return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg()); return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg());
} }
String host = taskInstance.getHost(); String host = Host.of(taskInstance.getHost()).getIp();
if(StringUtils.isEmpty(host)){ if(StringUtils.isEmpty(host)){
return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg()); return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg());
} }
@ -94,7 +95,7 @@ public class LoggerService {
if (taskInstance == null){ if (taskInstance == null){
throw new RuntimeException("task instance is null"); throw new RuntimeException("task instance is null");
} }
String host = taskInstance.getHost(); String host = Host.of(taskInstance.getHost()).getIp();
return logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath()); return logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath());
} }
} }

10
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -186,21 +186,11 @@ public class TaskInstance implements Serializable {
@TableField(exist = false) @TableField(exist = false)
private String dependentResult; private String dependentResult;
/** /**
* worker group id * worker group id
*/ */
private int workerGroupId; private int workerGroupId;
public void init(String host,Date startTime,String executePath){
this.host = host;
this.startTime = startTime;
this.executePath = executePath;
}
public ProcessInstance getProcessInstance() { public ProcessInstance getProcessInstance() {
return processInstance; return processInstance;
} }

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java

@ -48,7 +48,6 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setLogPath(taskInstance.getLogPath()); taskExecutionContext.setLogPath(taskInstance.getLogPath());
taskExecutionContext.setExecutePath(taskInstance.getExecutePath()); taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
taskExecutionContext.setTaskJson(taskInstance.getTaskJson()); taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
taskExecutionContext.setHost(taskInstance.getHost());
taskExecutionContext.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); taskExecutionContext.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
return this; return this;
} }

139
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java

@ -29,7 +29,7 @@ public class TaskExecutionContext implements Serializable{
/** /**
* task id * task id
*/ */
private Integer taskInstanceId; private int taskInstanceId;
/** /**
@ -51,7 +51,7 @@ public class TaskExecutionContext implements Serializable{
* host * host
*/ */
private String host; private String host;
/** /**
* task execute path * task execute path
*/ */
@ -70,7 +70,7 @@ public class TaskExecutionContext implements Serializable{
/** /**
* processId * processId
*/ */
private Integer processId; private int processId;
/** /**
* appIds * appIds
@ -80,7 +80,7 @@ public class TaskExecutionContext implements Serializable{
/** /**
* process instance id * process instance id
*/ */
private Integer processInstanceId; private int processInstanceId;
/** /**
@ -97,13 +97,13 @@ public class TaskExecutionContext implements Serializable{
/** /**
* execute user id * execute user id
*/ */
private Integer executorId; private int executorId;
/** /**
* command type if complement * command type if complement
*/ */
private Integer cmdTypeIfComplement; private int cmdTypeIfComplement;
/** /**
@ -120,12 +120,12 @@ public class TaskExecutionContext implements Serializable{
/** /**
* process define id * process define id
*/ */
private Integer processDefineId; private int processDefineId;
/** /**
* project id * project id
*/ */
private Integer projectId; private int projectId;
/** /**
* taskParams * taskParams
@ -173,22 +173,11 @@ public class TaskExecutionContext implements Serializable{
*/ */
private DataxTaskExecutionContext dataxTaskExecutionContext; private DataxTaskExecutionContext dataxTaskExecutionContext;
public int getTaskInstanceId() {
public String getWorkerGroup() {
return workerGroup;
}
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
public Integer getTaskInstanceId() {
return taskInstanceId; return taskInstanceId;
} }
public void setTaskInstanceId(Integer taskInstanceId) { public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId; this.taskInstanceId = taskInstanceId;
} }
@ -216,6 +205,14 @@ public class TaskExecutionContext implements Serializable{
this.taskType = taskType; this.taskType = taskType;
} }
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getExecutePath() { public String getExecutePath() {
return executePath; return executePath;
} }
@ -224,6 +221,14 @@ public class TaskExecutionContext implements Serializable{
this.executePath = executePath; this.executePath = executePath;
} }
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public String getTaskJson() { public String getTaskJson() {
return taskJson; return taskJson;
} }
@ -232,11 +237,27 @@ public class TaskExecutionContext implements Serializable{
this.taskJson = taskJson; this.taskJson = taskJson;
} }
public Integer getProcessInstanceId() { public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public String getAppIds() {
return appIds;
}
public void setAppIds(String appIds) {
this.appIds = appIds;
}
public int getProcessInstanceId() {
return processInstanceId; return processInstanceId;
} }
public void setProcessInstanceId(Integer processInstanceId) { public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId; this.processInstanceId = processInstanceId;
} }
@ -256,6 +277,22 @@ public class TaskExecutionContext implements Serializable{
this.globalParams = globalParams; this.globalParams = globalParams;
} }
public int getExecutorId() {
return executorId;
}
public void setExecutorId(int executorId) {
this.executorId = executorId;
}
public int getCmdTypeIfComplement() {
return cmdTypeIfComplement;
}
public void setCmdTypeIfComplement(int cmdTypeIfComplement) {
this.cmdTypeIfComplement = cmdTypeIfComplement;
}
public String getTenantCode() { public String getTenantCode() {
return tenantCode; return tenantCode;
} }
@ -272,46 +309,22 @@ public class TaskExecutionContext implements Serializable{
this.queue = queue; this.queue = queue;
} }
public Integer getProcessDefineId() { public int getProcessDefineId() {
return processDefineId; return processDefineId;
} }
public void setProcessDefineId(Integer processDefineId) { public void setProcessDefineId(int processDefineId) {
this.processDefineId = processDefineId; this.processDefineId = processDefineId;
} }
public Integer getProjectId() { public int getProjectId() {
return projectId; return projectId;
} }
public void setProjectId(Integer projectId) { public void setProjectId(int projectId) {
this.projectId = projectId; this.projectId = projectId;
} }
public Integer getExecutorId() {
return executorId;
}
public void setExecutorId(Integer executorId) {
this.executorId = executorId;
}
public Integer getCmdTypeIfComplement() {
return cmdTypeIfComplement;
}
public void setCmdTypeIfComplement(Integer cmdTypeIfComplement) {
this.cmdTypeIfComplement = cmdTypeIfComplement;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public String getTaskParams() { public String getTaskParams() {
return taskParams; return taskParams;
} }
@ -360,28 +373,12 @@ public class TaskExecutionContext implements Serializable{
this.taskTimeout = taskTimeout; this.taskTimeout = taskTimeout;
} }
public String getHost() { public String getWorkerGroup() {
return host; return workerGroup;
}
public void setHost(String host) {
this.host = host;
}
public Integer getProcessId() {
return processId;
}
public void setProcessId(Integer processId) {
this.processId = processId;
}
public String getAppIds() {
return appIds;
} }
public void setAppIds(String appIds) { public void setWorkerGroup(String workerGroup) {
this.appIds = appIds; this.workerGroup = workerGroup;
} }
public SQLTaskExecutionContext getSqlTaskExecutionContext() { public SQLTaskExecutionContext getSqlTaskExecutionContext() {

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java

@ -90,6 +90,7 @@ public class ExecutorDispatcher implements InitializingBean {
throw new ExecuteException(String.format("fail to execute : %s due to no worker ", context.getContext())); throw new ExecuteException(String.format("fail to execute : %s due to no worker ", context.getContext()));
} }
context.setHost(host); context.setHost(host);
context.getContext().setHost(host.getAddress());
executorManager.beforeExecute(context); executorManager.beforeExecute(context);
try { try {
/** /**

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java

@ -186,9 +186,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
TaskExecutionContext taskExecutionContext = super.getTaskExecutionContext(taskInstance); TaskExecutionContext taskExecutionContext = super.getTaskExecutionContext(taskInstance);
ExecutionContext executionContext = new ExecutionContext(taskExecutionContext, ExecutorType.WORKER); ExecutionContext executionContext = new ExecutionContext(taskExecutionContext, ExecutorType.WORKER);
Host host = new Host(); Host host = Host.of(taskInstance.getHost());
host.setIp(taskInstance.getHost());
host.setPort(12346);
executionContext.setHost(host); executionContext.setHost(host);
nettyExecutorManager.executeDirectly(executionContext); nettyExecutorManager.executeDirectly(executionContext);

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -78,10 +78,12 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
public void process(Channel channel, Command command) { public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(), Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType())); String.format("invalid command type : %s", command.getType()));
logger.info("received command : {}", command);
TaskExecuteRequestCommand taskRequestCommand = FastJsonSerializer.deserialize( TaskExecuteRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
command.getBody(), TaskExecuteRequestCommand.class); command.getBody(), TaskExecuteRequestCommand.class);
logger.info("received command : {}", taskRequestCommand);
String contextJson = taskRequestCommand.getTaskExecutionContext(); String contextJson = taskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class); TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
@ -141,7 +143,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode()); ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
ackCommand.setLogPath(getTaskLogPath(taskExecutionContext)); ackCommand.setLogPath(getTaskLogPath(taskExecutionContext));
ackCommand.setHost(OSUtils.getHost()); ackCommand.setHost(taskExecutionContext.getHost());
ackCommand.setStartTime(new Date()); ackCommand.setStartTime(new Date());
if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){ if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
ackCommand.setExecutePath(null); ackCommand.setExecutePath(null);

Loading…
Cancel
Save