Browse Source
* Refactor worker (#2000) * Refactor worker (#2) * Refactor worker (#1993) * Refactor worker (#1) * add TaskResponseProcessor (#1983) * 1, master persistent task 2. extract master and worker communication model (#1992) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * updates Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * TaskExecutionContext create modify (#1994) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * updates * add- register processor Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * buildAckCommand taskInstanceId not set modify (#2002) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify (#2004) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment (#2006) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type (#2012) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * Refactor worker (#2018) * Refactor worker (#7) * Refactor worker (#2000) * Refactor worker (#2) * Refactor worker (#1993) * Refactor worker (#1) * add TaskResponseProcessor (#1983) * 1, master persistent task 2. extract master and worker communication model (#1992) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * updates Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * TaskExecutionContext create modify (#1994) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * updates * add- register processor Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * buildAckCommand taskInstanceId not set modify (#2002) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify (#2004) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment (#2006) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type (#2012) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * Refactor worker (#8) * Refactor worker (#2000) * Refactor worker (#2) * Refactor worker (#1993) * Refactor worker (#1) * add TaskResponseProcessor (#1983) * 1, master persistent task 2. extract master and worker communication model (#1992) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * updates Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * TaskExecutionContext create modify (#1994) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * updates * add- register processor Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * buildAckCommand taskInstanceId not set modify (#2002) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify (#2004) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment (#2006) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type (#2012) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * add kill command Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * add TaskInstanceCacheManager receive Worker report result,modify master polling db transfrom to cache (#2021) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * add TaskInstanceCacheManager receive Worker report result * TaskInstance setExecutePath * add TaskInstanceCacheManager to receive Worker Task result report * TaskInstanceCacheManager add remove method * add license * add dispatcht task method * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>pull/2/head
Tboy
5 years ago
committed by
GitHub
34 changed files with 678 additions and 257 deletions
@ -1 +1 @@ |
|||||||
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
/**
* execute task response command
*/
public class ExecuteTaskResponseCommand implements Serializable {
public ExecuteTaskResponseCommand() {
}
public ExecuteTaskResponseCommand(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
/**
* task instance id
*/
private int taskInstanceId;
/**
* status
*/
private int status;
/**
* end time
*/
private Date endTime;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
/**
* package response command
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
} |
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
/**
* execute task response command
*/
public class ExecuteTaskResponseCommand implements Serializable {
public ExecuteTaskResponseCommand() {
}
public ExecuteTaskResponseCommand(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
/**
* task instance id
*/
private int taskInstanceId;
/**
* status
*/
private int status;
/**
* end time
*/
private Date endTime;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
/**
* package response command
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "ExecuteTaskResponseCommand{" +
"taskInstanceId=" + taskInstanceId +
", status=" + status +
", endTime=" + endTime +
'}';
}
} |
@ -0,0 +1,64 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.server.master.cache; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||||
|
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; |
||||||
|
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; |
||||||
|
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; |
||||||
|
|
||||||
|
/** |
||||||
|
* task instance state manager |
||||||
|
*/ |
||||||
|
public interface TaskInstanceCacheManager { |
||||||
|
|
||||||
|
/** |
||||||
|
* get taskInstance by taskInstance id |
||||||
|
* |
||||||
|
* @param taskInstanceId taskInstanceId |
||||||
|
* @return taskInstance |
||||||
|
*/ |
||||||
|
TaskInstance getByTaskInstanceId(Integer taskInstanceId); |
||||||
|
|
||||||
|
/** |
||||||
|
* cache taskInstance |
||||||
|
* |
||||||
|
* @param taskExecutionContext taskExecutionContext |
||||||
|
*/ |
||||||
|
void cacheTaskInstance(TaskExecutionContext taskExecutionContext); |
||||||
|
|
||||||
|
/** |
||||||
|
* cache taskInstance |
||||||
|
* |
||||||
|
* @param taskAckCommand taskAckCommand |
||||||
|
*/ |
||||||
|
void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand); |
||||||
|
|
||||||
|
/** |
||||||
|
* cache taskInstance |
||||||
|
* |
||||||
|
* @param executeTaskResponseCommand executeTaskResponseCommand |
||||||
|
*/ |
||||||
|
void cacheTaskInstance(ExecuteTaskResponseCommand executeTaskResponseCommand); |
||||||
|
|
||||||
|
/** |
||||||
|
* remove taskInstance by taskInstanceId |
||||||
|
* @param taskInstanceId taskInstanceId |
||||||
|
*/ |
||||||
|
void removeByTaskInstanceId(Integer taskInstanceId); |
||||||
|
} |
@ -0,0 +1,115 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
package org.apache.dolphinscheduler.server.master.cache.impl; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||||
|
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; |
||||||
|
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; |
||||||
|
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; |
||||||
|
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; |
||||||
|
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||||
|
import org.springframework.stereotype.Component; |
||||||
|
|
||||||
|
import java.util.Map; |
||||||
|
import java.util.concurrent.ConcurrentHashMap; |
||||||
|
|
||||||
|
/** |
||||||
|
* taskInstance state manager |
||||||
|
*/ |
||||||
|
@Component |
||||||
|
public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager { |
||||||
|
|
||||||
|
/** |
||||||
|
* taskInstance caceh |
||||||
|
*/ |
||||||
|
private Map<Integer,TaskInstance> taskInstanceCache = new ConcurrentHashMap<>(); |
||||||
|
|
||||||
|
|
||||||
|
/** |
||||||
|
* get taskInstance by taskInstance id |
||||||
|
* |
||||||
|
* @param taskInstanceId taskInstanceId |
||||||
|
* @return taskInstance |
||||||
|
*/ |
||||||
|
@Override |
||||||
|
public TaskInstance getByTaskInstanceId(Integer taskInstanceId) { |
||||||
|
return taskInstanceCache.get(taskInstanceId); |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* cache taskInstance |
||||||
|
* |
||||||
|
* @param taskExecutionContext taskExecutionContext |
||||||
|
*/ |
||||||
|
@Override |
||||||
|
public void cacheTaskInstance(TaskExecutionContext taskExecutionContext) { |
||||||
|
TaskInstance taskInstance = getByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); |
||||||
|
if (taskInstance == null){ |
||||||
|
taskInstance = new TaskInstance(); |
||||||
|
} |
||||||
|
taskInstance.setId(taskExecutionContext.getTaskInstanceId()); |
||||||
|
taskInstance.setName(taskExecutionContext.getTaskName()); |
||||||
|
taskInstance.setStartTime(taskExecutionContext.getStartTime()); |
||||||
|
taskInstance.setTaskType(taskInstance.getTaskType()); |
||||||
|
taskInstance.setExecutePath(taskInstance.getExecutePath()); |
||||||
|
taskInstance.setTaskJson(taskInstance.getTaskJson()); |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* cache taskInstance |
||||||
|
* |
||||||
|
* @param taskAckCommand taskAckCommand |
||||||
|
*/ |
||||||
|
@Override |
||||||
|
public void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand) { |
||||||
|
TaskInstance taskInstance = getByTaskInstanceId(taskAckCommand.getTaskInstanceId()); |
||||||
|
if (taskInstance == null){ |
||||||
|
taskInstance = new TaskInstance(); |
||||||
|
} |
||||||
|
taskInstance.setState(ExecutionStatus.of(taskAckCommand.getStatus())); |
||||||
|
taskInstance.setStartTime(taskAckCommand.getStartTime()); |
||||||
|
taskInstance.setHost(taskAckCommand.getHost()); |
||||||
|
taskInstance.setExecutePath(taskAckCommand.getExecutePath()); |
||||||
|
taskInstance.setLogPath(taskAckCommand.getLogPath()); |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* cache taskInstance |
||||||
|
* |
||||||
|
* @param executeTaskResponseCommand executeTaskResponseCommand |
||||||
|
*/ |
||||||
|
@Override |
||||||
|
public void cacheTaskInstance(ExecuteTaskResponseCommand executeTaskResponseCommand) { |
||||||
|
TaskInstance taskInstance = getByTaskInstanceId(executeTaskResponseCommand.getTaskInstanceId()); |
||||||
|
if (taskInstance == null){ |
||||||
|
taskInstance = new TaskInstance(); |
||||||
|
} |
||||||
|
taskInstance.setState(ExecutionStatus.of(executeTaskResponseCommand.getStatus())); |
||||||
|
taskInstance.setEndTime(executeTaskResponseCommand.getEndTime()); |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* remove taskInstance by taskInstanceId |
||||||
|
* @param taskInstanceId taskInstanceId |
||||||
|
*/ |
||||||
|
@Override |
||||||
|
public void removeByTaskInstanceId(Integer taskInstanceId) { |
||||||
|
taskInstanceCache.remove(taskInstanceId); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,69 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.server.worker.task; |
||||||
|
|
||||||
|
/** |
||||||
|
* command execute result |
||||||
|
*/ |
||||||
|
public class CommandExecuteResult { |
||||||
|
|
||||||
|
/** |
||||||
|
* command exit code |
||||||
|
*/ |
||||||
|
private Integer exitStatusCode; |
||||||
|
|
||||||
|
/** |
||||||
|
* appIds |
||||||
|
*/ |
||||||
|
private String appIds; |
||||||
|
|
||||||
|
/** |
||||||
|
* process id |
||||||
|
*/ |
||||||
|
private Integer processId; |
||||||
|
|
||||||
|
|
||||||
|
public CommandExecuteResult(){ |
||||||
|
this.exitStatusCode = 0; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
public Integer getExitStatusCode() { |
||||||
|
return exitStatusCode; |
||||||
|
} |
||||||
|
|
||||||
|
public void setExitStatusCode(Integer exitStatusCode) { |
||||||
|
this.exitStatusCode = exitStatusCode; |
||||||
|
} |
||||||
|
|
||||||
|
public String getAppIds() { |
||||||
|
return appIds; |
||||||
|
} |
||||||
|
|
||||||
|
public void setAppIds(String appIds) { |
||||||
|
this.appIds = appIds; |
||||||
|
} |
||||||
|
|
||||||
|
public Integer getProcessId() { |
||||||
|
return processId; |
||||||
|
} |
||||||
|
|
||||||
|
public void setProcessId(Integer processId) { |
||||||
|
this.processId = processId; |
||||||
|
} |
||||||
|
} |
Loading…
Reference in new issue