|
|
@ -52,7 +52,6 @@ import org.springframework.context.annotation.FilterType; |
|
|
|
import javax.annotation.PostConstruct; |
|
|
|
import javax.annotation.PostConstruct; |
|
|
|
import javax.annotation.PreDestroy; |
|
|
|
import javax.annotation.PreDestroy; |
|
|
|
import java.util.Set; |
|
|
|
import java.util.Set; |
|
|
|
import java.util.concurrent.CountDownLatch; |
|
|
|
|
|
|
|
import java.util.concurrent.ExecutorService; |
|
|
|
import java.util.concurrent.ExecutorService; |
|
|
|
import java.util.concurrent.ScheduledExecutorService; |
|
|
|
import java.util.concurrent.ScheduledExecutorService; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
@ -255,7 +254,7 @@ public class WorkerServer implements IStoppable { |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
private Runnable heartBeatThread(){ |
|
|
|
private Runnable heartBeatThread(){ |
|
|
|
logger.info("start worker heart beat thread..."); |
|
|
|
logger.info("start worker heart beat thread..."); |
|
|
|
Runnable heartBeatThread = new Runnable() { |
|
|
|
return new Runnable() { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void run() { |
|
|
|
public void run() { |
|
|
|
// send heartbeat to zk
|
|
|
|
// send heartbeat to zk
|
|
|
@ -266,7 +265,6 @@ public class WorkerServer implements IStoppable { |
|
|
|
zkWorkerClient.heartBeatForZk(zkWorkerClient.getWorkerZNode() , Constants.WORKER_PREFIX); |
|
|
|
zkWorkerClient.heartBeatForZk(zkWorkerClient.getWorkerZNode() , Constants.WORKER_PREFIX); |
|
|
|
} |
|
|
|
} |
|
|
|
}; |
|
|
|
}; |
|
|
|
return heartBeatThread; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -276,7 +274,7 @@ public class WorkerServer implements IStoppable { |
|
|
|
* @return kill process thread |
|
|
|
* @return kill process thread |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
private Runnable getKillProcessThread(){ |
|
|
|
private Runnable getKillProcessThread(){ |
|
|
|
Runnable killProcessThread = new Runnable() { |
|
|
|
return new Runnable() { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void run() { |
|
|
|
public void run() { |
|
|
|
logger.info("start listening kill process thread..."); |
|
|
|
logger.info("start listening kill process thread..."); |
|
|
@ -297,7 +295,6 @@ public class WorkerServer implements IStoppable { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
}; |
|
|
|
}; |
|
|
|
return killProcessThread; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
@ -307,17 +304,17 @@ public class WorkerServer implements IStoppable { |
|
|
|
* @param pd process dao |
|
|
|
* @param pd process dao |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
private void killTask(String taskInfo, ProcessService pd) { |
|
|
|
private void killTask(String taskInfo, ProcessService pd) { |
|
|
|
logger.info("get one kill command from tasks kill queue: " + taskInfo); |
|
|
|
logger.info("get one kill command from tasks kill queue: {}" , taskInfo); |
|
|
|
String[] taskInfoArray = taskInfo.split("-"); |
|
|
|
String[] taskInfoArray = taskInfo.split("-"); |
|
|
|
if(taskInfoArray.length != 2){ |
|
|
|
if(taskInfoArray.length != 2){ |
|
|
|
logger.error("error format kill info: " + taskInfo); |
|
|
|
logger.error("error format kill info: {}", taskInfo); |
|
|
|
return ; |
|
|
|
return ; |
|
|
|
} |
|
|
|
} |
|
|
|
String host = taskInfoArray[0]; |
|
|
|
String host = taskInfoArray[0]; |
|
|
|
int taskInstanceId = Integer.parseInt(taskInfoArray[1]); |
|
|
|
int taskInstanceId = Integer.parseInt(taskInfoArray[1]); |
|
|
|
TaskInstance taskInstance = pd.getTaskInstanceDetailByTaskId(taskInstanceId); |
|
|
|
TaskInstance taskInstance = pd.getTaskInstanceDetailByTaskId(taskInstanceId); |
|
|
|
if(taskInstance == null){ |
|
|
|
if(taskInstance == null){ |
|
|
|
logger.error("cannot find the kill task :" + taskInfo); |
|
|
|
logger.error("cannot find the kill task : {}", taskInfo); |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -332,8 +329,7 @@ public class WorkerServer implements IStoppable { |
|
|
|
}else if(!taskInstance.getState().typeIsFinished()){ |
|
|
|
}else if(!taskInstance.getState().typeIsFinished()){ |
|
|
|
ProcessUtils.kill(taskInstance); |
|
|
|
ProcessUtils.kill(taskInstance); |
|
|
|
}else{ |
|
|
|
}else{ |
|
|
|
logger.info("the task aleady finish: task id: " + taskInstance.getId() |
|
|
|
logger.info("the task aleady finish: task id: {} state: {}", taskInstance.getId(), taskInstance.getState()); |
|
|
|
+ " state: " + taskInstance.getState().toString()); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -347,7 +343,7 @@ public class WorkerServer implements IStoppable { |
|
|
|
private void deleteTaskFromQueue(TaskInstance taskInstance, ProcessService pd){ |
|
|
|
private void deleteTaskFromQueue(TaskInstance taskInstance, ProcessService pd){ |
|
|
|
// creating distributed locks, lock path /dolphinscheduler/lock/worker
|
|
|
|
// creating distributed locks, lock path /dolphinscheduler/lock/worker
|
|
|
|
InterProcessMutex mutex = null; |
|
|
|
InterProcessMutex mutex = null; |
|
|
|
logger.info("delete task from tasks queue: " + taskInstance.getId()); |
|
|
|
logger.info("delete task from tasks queue: {}", taskInstance.getId()); |
|
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
try { |
|
|
|
mutex = zkWorkerClient.acquireZkLock(zkWorkerClient.getZkClient(), |
|
|
|
mutex = zkWorkerClient.acquireZkLock(zkWorkerClient.getZkClient(), |
|
|
|