Browse Source

Merge branch 'refactor-worker' into refactor-worker

pull/2/head
Tboy 4 years ago committed by GitHub
parent
commit
9f1865b74a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 250
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java
  2. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  3. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
  4. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

250
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java

@ -0,0 +1,250 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import java.io.Serializable;
import java.util.Date;
/**
* master/worker task transport
*/
public class TaskInfo implements Serializable{
/**
* task instance id
*/
private Integer taskId;
/**
* taks name
*/
private String taskName;
/**
* task start time
*/
private Date startTime;
/**
* task type
*/
private String taskType;
/**
* task execute path
*/
private String executePath;
/**
* task json
*/
private String taskJson;
/**
* process instance id
*/
private Integer processInstanceId;
/**
* process instance schedule time
*/
private Date scheduleTime;
/**
* process instance global parameters
*/
private String globalParams;
/**
* execute user id
*/
private Integer executorId;
/**
* command type if complement
*/
private Integer cmdTypeIfComplement;
/**
* tenant code
*/
private String tenantCode;
/**
* task queue
*/
private String queue;
/**
* process define id
*/
private Integer processDefineId;
/**
* project id
*/
private Integer projectId;
public Integer getTaskId() {
return taskId;
}
public void setTaskId(Integer taskId) {
this.taskId = taskId;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public String getTaskType() {
return taskType;
}
public void setTaskType(String taskType) {
this.taskType = taskType;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
public String getTaskJson() {
return taskJson;
}
public void setTaskJson(String taskJson) {
this.taskJson = taskJson;
}
public Integer getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(Integer processInstanceId) {
this.processInstanceId = processInstanceId;
}
public Date getScheduleTime() {
return scheduleTime;
}
public void setScheduleTime(Date scheduleTime) {
this.scheduleTime = scheduleTime;
}
public String getGlobalParams() {
return globalParams;
}
public void setGlobalParams(String globalParams) {
this.globalParams = globalParams;
}
public String getTenantCode() {
return tenantCode;
}
public void setTenantCode(String tenantCode) {
this.tenantCode = tenantCode;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public Integer getProcessDefineId() {
return processDefineId;
}
public void setProcessDefineId(Integer processDefineId) {
this.processDefineId = processDefineId;
}
public Integer getProjectId() {
return projectId;
}
public void setProjectId(Integer projectId) {
this.projectId = projectId;
}
public Integer getExecutorId() {
return executorId;
}
public void setExecutorId(Integer executorId) {
this.executorId = executorId;
}
public Integer getCmdTypeIfComplement() {
return cmdTypeIfComplement;
}
public void setCmdTypeIfComplement(Integer cmdTypeIfComplement) {
this.cmdTypeIfComplement = cmdTypeIfComplement;
}
@Override
public String toString() {
return "TaskInfo{" +
"taskId=" + taskId +
", taskName='" + taskName + '\'' +
", startTime=" + startTime +
", taskType='" + taskType + '\'' +
", executePath='" + executePath + '\'' +
", taskJson='" + taskJson + '\'' +
", processInstanceId=" + processInstanceId +
", scheduleTime=" + scheduleTime +
", globalParams='" + globalParams + '\'' +
", executorId=" + executorId +
", cmdTypeIfComplement=" + cmdTypeIfComplement +
", tenantCode='" + tenantCode + '\'' +
", queue='" + queue + '\'' +
", processDefineId=" + processDefineId +
", projectId=" + projectId +
'}';
}
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -155,8 +155,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
}
}
/**
* set task instance relation
*

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java

@ -92,7 +92,6 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
} catch (Exception ex){
logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
}
taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskId(),
new CallbackChannel(channel, command.getOpaque()));
@ -104,7 +103,6 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
/**
* get execute local path
*
* @param taskExecutionContext taskExecutionContext
* @return execute local path
*/

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

@ -80,7 +80,6 @@ public class TaskScheduleThread implements Runnable {
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
* @param processService processService
* @param taskInstanceCallbackService taskInstanceCallbackService
@ -177,7 +176,6 @@ public class TaskScheduleThread implements Runnable {
// global params string
String globalParamsStr = taskExecutionContext.getGlobalParams();
if (globalParamsStr != null) {
List<Property> globalParamsList = JSONObject.parseArray(globalParamsStr, Property.class);
globalParamsMap.putAll(globalParamsList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)));

Loading…
Cancel
Save