|
|
|
@ -14,8 +14,12 @@
|
|
|
|
|
* See the License for the specific language governing permissions and |
|
|
|
|
* limitations under the License. |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
package org.apache.dolphinscheduler.common.shell; |
|
|
|
|
|
|
|
|
|
import org.slf4j.Logger; |
|
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
|
|
|
|
|
|
import java.io.BufferedReader; |
|
|
|
|
import java.io.File; |
|
|
|
|
import java.io.IOException; |
|
|
|
@ -27,10 +31,6 @@ import java.util.TimerTask;
|
|
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean; |
|
|
|
|
|
|
|
|
|
import org.slf4j.Logger; |
|
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* A base class for running a Unix command. |
|
|
|
|
* |
|
|
|
@ -88,13 +88,11 @@ public abstract class AbstractShell {
|
|
|
|
|
* @param interval the minimum duration to wait before re-executing the |
|
|
|
|
* command. |
|
|
|
|
*/ |
|
|
|
|
public AbstractShell(long interval ) { |
|
|
|
|
public AbstractShell(long interval) { |
|
|
|
|
this.interval = interval; |
|
|
|
|
this.lastTime = (interval<0) ? 0 : -interval; |
|
|
|
|
this.lastTime = (interval < 0) ? 0 : -interval; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* set the environment for the command |
|
|
|
|
* @param env Mapping of environment variables |
|
|
|
@ -124,7 +122,6 @@ public abstract class AbstractShell {
|
|
|
|
|
runCommand(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Run a command actual work |
|
|
|
|
*/ |
|
|
|
@ -147,8 +144,7 @@ public abstract class AbstractShell {
|
|
|
|
|
|
|
|
|
|
if (timeOutInterval > 0) { |
|
|
|
|
timeOutTimer = new Timer(); |
|
|
|
|
timeoutTimerTask = new ShellTimeoutTimerTask( |
|
|
|
|
this); |
|
|
|
|
timeoutTimerTask = new ShellTimeoutTimerTask(this); |
|
|
|
|
//One time scheduling.
|
|
|
|
|
timeOutTimer.schedule(timeoutTimerTask, timeOutInterval); |
|
|
|
|
} |
|
|
|
@ -167,12 +163,12 @@ public abstract class AbstractShell {
|
|
|
|
|
public void run() { |
|
|
|
|
try { |
|
|
|
|
String line = errReader.readLine(); |
|
|
|
|
while((line != null) && !isInterrupted()) { |
|
|
|
|
while ((line != null) && !isInterrupted()) { |
|
|
|
|
errMsg.append(line); |
|
|
|
|
errMsg.append(System.getProperty("line.separator")); |
|
|
|
|
line = errReader.readLine(); |
|
|
|
|
} |
|
|
|
|
} catch(IOException ioe) { |
|
|
|
|
} catch (IOException ioe) { |
|
|
|
|
logger.warn("Error reading the error stream", ioe); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -191,7 +187,9 @@ public abstract class AbstractShell {
|
|
|
|
|
try { |
|
|
|
|
errThread.start(); |
|
|
|
|
inThread.start(); |
|
|
|
|
} catch (IllegalStateException ise) { } |
|
|
|
|
} catch (IllegalStateException ise) { |
|
|
|
|
logger.warn("Illegal while starting the error and in thread", ise); |
|
|
|
|
} |
|
|
|
|
try { |
|
|
|
|
// parse the output
|
|
|
|
|
exitCode = process.waitFor(); |
|
|
|
@ -211,7 +209,7 @@ public abstract class AbstractShell {
|
|
|
|
|
} catch (InterruptedException ie) { |
|
|
|
|
throw new IOException(ie.toString()); |
|
|
|
|
} finally { |
|
|
|
|
if ((timeOutTimer!=null) && !timedOut.get()) { |
|
|
|
|
if ((timeOutTimer != null) && !timedOut.get()) { |
|
|
|
|
timeOutTimer.cancel(); |
|
|
|
|
} |
|
|
|
|
// close the input stream
|
|
|
|
@ -245,8 +243,7 @@ public abstract class AbstractShell {
|
|
|
|
|
* @param lines lines |
|
|
|
|
* @throws IOException errors |
|
|
|
|
*/ |
|
|
|
|
protected abstract void parseExecResult(BufferedReader lines) |
|
|
|
|
throws IOException; |
|
|
|
|
protected abstract void parseExecResult(BufferedReader lines) throws IOException; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* get the current sub-process executing the given command |
|
|
|
@ -271,8 +268,6 @@ public abstract class AbstractShell {
|
|
|
|
|
this.timedOut.set(true); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Timer which is used to timeout scripts spawned off by shell. |
|
|
|
|
*/ |
|
|
|
@ -321,30 +316,33 @@ public abstract class AbstractShell {
|
|
|
|
|
* process manage container |
|
|
|
|
* |
|
|
|
|
*/ |
|
|
|
|
public static class ProcessContainer extends ConcurrentHashMap<Integer, Process>{ |
|
|
|
|
public static class ProcessContainer extends ConcurrentHashMap<Integer, Process> { |
|
|
|
|
private static final ProcessContainer container = new ProcessContainer(); |
|
|
|
|
private ProcessContainer(){ |
|
|
|
|
|
|
|
|
|
private ProcessContainer() { |
|
|
|
|
super(); |
|
|
|
|
} |
|
|
|
|
public static final ProcessContainer getInstance(){ |
|
|
|
|
|
|
|
|
|
public static final ProcessContainer getInstance() { |
|
|
|
|
return container; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public static void putProcess(Process process){ |
|
|
|
|
public static void putProcess(Process process) { |
|
|
|
|
getInstance().put(process.hashCode(), process); |
|
|
|
|
} |
|
|
|
|
public static int processSize(){ |
|
|
|
|
|
|
|
|
|
public static int processSize() { |
|
|
|
|
return getInstance().size(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public static void removeProcess(Process process){ |
|
|
|
|
public static void removeProcess(Process process) { |
|
|
|
|
getInstance().remove(process.hashCode()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public static void destroyAllProcess(){ |
|
|
|
|
public static void destroyAllProcess() { |
|
|
|
|
Set<Entry<Integer, Process>> set = getInstance().entrySet(); |
|
|
|
|
for (Entry<Integer, Process> entry : set) { |
|
|
|
|
try{ |
|
|
|
|
try { |
|
|
|
|
entry.getValue().destroy(); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
logger.error("Destroy All Processes error", e); |
|
|
|
|