@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License .
* /
package org.apache.dolphinscheduler.server.worker.task.shell ;
package org.apache.dolphinscheduler.server.worker.task.shell ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.enums.CommandType ;
@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters ;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters ;
import org.apache.dolphinscheduler.common.utils.DateUtils ;
import org.apache.dolphinscheduler.common.utils.* ;
import org.apache.dolphinscheduler.common.utils.JSONUtils ;
import org.apache.dolphinscheduler.common.utils.OSUtils ;
import org.apache.dolphinscheduler.common.utils.ParameterUtils ;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext ;
@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask ;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult ;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor ;
import org.slf4j.Logger ;
import java.io.File ;
import java.nio.file.Files ;
@ -43,136 +42,136 @@ import java.nio.file.attribute.PosixFilePermissions;
import java.util.Map ;
import java.util.Set ;
import org.slf4j.Logger ;
/ * *
* shell task
* /
public class ShellTask extends AbstractTask {
/ * *
* shell parameters
* /
private ShellParameters shellParameters ;
/ * *
* shell command executor
* /
private ShellCommandExecutor shellCommandExecutor ;
/ * *
* taskExecutionContext
* /
private TaskExecutionContext taskExecutionContext ;
/ * *
* constructor
* @param taskExecutionContext taskExecutionContext
* @param logger logger
* /
public ShellTask ( TaskExecutionContext taskExecutionContext , Logger logger ) {
super ( taskExecutionContext , logger ) ;
this . taskExecutionContext = taskExecutionContext ;
this . shellCommandExecutor = new ShellCommandExecutor ( this : : logHandle ,
/ * *
* shell parameters
* /
private ShellParameters shellParameters ;
/ * *
* shell command executor
* /
private ShellCommandExecutor shellCommandExecutor ;
/ * *
* taskExecutionContext
* /
private TaskExecutionContext taskExecutionContext ;
/ * *
* constructor
*
* @param taskExecutionContext taskExecutionContext
* @param logger logger
* /
public ShellTask ( TaskExecutionContext taskExecutionContext , Logger logger ) {
super ( taskExecutionContext , logger ) ;
this . taskExecutionContext = taskExecutionContext ;
this . shellCommandExecutor = new ShellCommandExecutor ( this : : logHandle ,
taskExecutionContext ,
logger ) ;
}
}
@Override
public void init ( ) {
logger . info ( "shell task params {}" , taskExecutionContext . getTaskParams ( ) ) ;
@Override
public void init ( ) {
logger . info ( "shell task params {}" , taskExecutionContext . getTaskParams ( ) ) ;
shellParameters = JSONUtils . parseObject ( taskExecutionContext . getTaskParams ( ) , ShellParameters . class ) ;
shellParameters = JSONUtils . parseObject ( taskExecutionContext . getTaskParams ( ) , ShellParameters . class ) ;
if ( ! shellParameters . checkParameters ( ) ) {
throw new RuntimeException ( "shell task params is not valid" ) ;
}
}
@Override
public void handle ( ) throws Exception {
try {
// construct process
CommandExecuteResult commandExecuteResult = shellCommandExecutor . run ( buildCommand ( ) ) ;
setExitStatusCode ( commandExecuteResult . getExitStatusCode ( ) ) ;
setAppIds ( commandExecuteResult . getAppIds ( ) ) ;
setProcessId ( commandExecuteResult . getProcessId ( ) ) ;
} catch ( Exception e ) {
logger . error ( "shell task error" , e ) ;
setExitStatusCode ( Constants . EXIT_CODE_FAILURE ) ;
throw e ;
if ( ! shellParameters . checkParameters ( ) ) {
throw new RuntimeException ( "shell task params is not valid" ) ;
}
}
}
@Override
public void cancelApplication ( boolean cancelApplication ) throws Exception {
// cancel process
shellCommandExecutor . cancelApplication ( ) ;
}
/ * *
* create command
* @return file name
* @throws Exception exception
* /
private String buildCommand ( ) throws Exception {
// generate scripts
String fileName = String . format ( "%s/%s_node.%s" ,
taskExecutionContext . getExecutePath ( ) ,
taskExecutionContext . getTaskAppId ( ) , OSUtils . isWindows ( ) ? "bat" : "sh" ) ;
Path path = new File ( fileName ) . toPath ( ) ;
@Override
public void handle ( ) throws Exception {
try {
// construct process
CommandExecuteResult commandExecuteResult = shellCommandExecutor . run ( buildCommand ( ) ) ;
setExitStatusCode ( commandExecuteResult . getExitStatusCode ( ) ) ;
setAppIds ( commandExecuteResult . getAppIds ( ) ) ;
setProcessId ( commandExecuteResult . getProcessId ( ) ) ;
} catch ( Exception e ) {
logger . error ( "shell task error" , e ) ;
setExitStatusCode ( Constants . EXIT_CODE_FAILURE ) ;
throw e ;
}
}
if ( Files . exists ( path ) ) {
return fileName ;
@Override
public void cancelApplication ( boolean cancelApplication ) throws Exception {
// cancel process
shellCommandExecutor . cancelApplication ( ) ;
}
String script = shellParameters . getRawScript ( ) . replaceAll ( "\\r\\n" , "\n" ) ;
/ * *
* combining local and global parameters
* create command
*
* @return file name
* @throws Exception exception
* /
Map < String , Property > paramsMap = ParamUtils . convert ( ParamUtils . getUserDefParamsMap ( taskExecutionContext . getDefinedParams ( ) ) ,
private String buildCommand ( ) throws Exception {
// generate scripts
String fileName = String . format ( "%s/%s_node.%s" ,
taskExecutionContext . getExecutePath ( ) ,
taskExecutionContext . getTaskAppId ( ) , OSUtils . isWindows ( ) ? "bat" : "sh" ) ;
Path path = new File ( fileName ) . toPath ( ) ;
if ( Files . exists ( path ) ) {
return fileName ;
}
String script = shellParameters . getRawScript ( ) . replaceAll ( "\\r\\n" , "\n" ) ;
// combining local and global parameters
Map < String , Property > paramsMap = ParamUtils . convert ( ParamUtils . getUserDefParamsMap ( taskExecutionContext . getDefinedParams ( ) ) ,
taskExecutionContext . getDefinedParams ( ) ,
shellParameters . getLocalParametersMap ( ) ,
CommandType . of ( taskExecutionContext . getCmdTypeIfComplement ( ) ) ,
taskExecutionContext . getScheduleTime ( ) ) ;
if ( paramsMap ! = null ) {
script = ParameterUtils . convertParameterPlaceholders ( script , ParamUtils . convert ( paramsMap ) ) ;
}
// new
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
if ( paramsMap ! = null ) {
if ( taskExecutionContext . getScheduleTime ( ) ! = null ) {
String dateTime = DateUtils . format ( taskExecutionContext . getScheduleTime ( ) , Constants . PARAMETER_FORMAT_TIME ) ;
Property p = new Property ( ) ;
p . setValue ( dateTime ) ;
p . setProp ( Constants . PARAMETER_SHECDULE_TIME ) ;
paramsMap . put ( Constants . PARAMETER_SHECDULE_TIME , p ) ;
}
script = ParameterUtils . convertParameterPlaceholders2 ( script , ParamUtils . convert ( paramsMap ) ) ;
}
shellParameters . setRawScript ( script ) ;
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
if ( paramsMap ! = null ) {
if ( taskExecutionContext . getScheduleTime ( ) ! = null ) {
String dateTime = DateUtils . format ( taskExecutionContext . getScheduleTime ( ) , Constants . PARAMETER_FORMAT_TIME ) ;
Property p = new Property ( ) ;
p . setValue ( dateTime ) ;
p . setProp ( Constants . PARAMETER_SHECDULE_TIME ) ;
paramsMap . put ( Constants . PARAMETER_SHECDULE_TIME , p ) ;
}
}
logger . info ( "raw script : {}" , shellParameters . getRawScript ( ) ) ;
logger . info ( "task execute path : {}" , taskExecutionContext . getExecutePath ( ) ) ;
script = ParameterUtils . convertParameterPlaceholders2 ( script , ParamUtils . convert ( paramsMap ) ) ;
Set < PosixFilePermission > perms = PosixFilePermissions . fromString ( Constants . RWXR_XR_X ) ;
FileAttribute < Set < PosixFilePermission > > attr = PosixFilePermissions . asFileAttribute ( perms ) ;
shellParameters . setRawScript ( script ) ;
if ( OSUtils . isWindows ( ) ) {
Files . createFile ( path ) ;
} else {
Files . createFile ( path , attr ) ;
}
logger . info ( "raw script : {}" , shellParameters . getRawScript ( ) ) ;
logger . info ( "task execute path : {}" , taskExecutionContext . getExecutePath ( ) ) ;
Files . write ( path , shellParameters . getRawScript ( ) . getBytes ( ) , StandardOpenOption . APPEND ) ;
Set < PosixFilePermission > perms = PosixFilePermissions . fromString ( Constants . RWXR_XR_X ) ;
FileAttribute < Set < PosixFilePermission > > attr = PosixFilePermissions . asFileAttribute ( perms ) ;
return fileName ;
}
if ( OSUtils . isWindows ( ) ) {
Files . createFile ( path ) ;
} else {
Files . createFile ( path , attr ) ;
}
@Override
public AbstractParameters getParameters ( ) {
return shellParameters ;
}
Files . write ( path , shellParameters . getRawScript ( ) . getBytes ( ) , StandardOpenOption . APPEND ) ;
return fileName ;
}
@Override
public AbstractParameters getParameters ( ) {
return shellParameters ;
}
}