@ -78,6 +78,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Strings ;
public abstract class UpgradeDao {
public static final Logger logger = LoggerFactory . getLogger ( UpgradeDao . class ) ;
private static final String T_VERSION_NAME = "t_escheduler_version" ;
private static final String T_NEW_VERSION_NAME = "t_ds_version" ;
@ -198,7 +199,8 @@ public abstract class UpgradeDao {
Map < Integer , String > replaceProcessDefinitionMap = new HashMap < > ( ) ;
try {
Map < Integer , String > oldWorkerGroupMap = workerGroupDao . queryAllOldWorkerGroup ( dataSource . getConnection ( ) ) ;
Map < Integer , String > processDefinitionJsonMap = processDefinitionDao . queryAllProcessDefinition ( dataSource . getConnection ( ) ) ;
Map < Integer , String > processDefinitionJsonMap =
processDefinitionDao . queryAllProcessDefinition ( dataSource . getConnection ( ) ) ;
for ( Map . Entry < Integer , String > entry : processDefinitionJsonMap . entrySet ( ) ) {
ObjectNode jsonObject = JSONUtils . parseObject ( entry . getValue ( ) ) ;
@ -225,7 +227,8 @@ public abstract class UpgradeDao {
replaceProcessDefinitionMap . put ( entry . getKey ( ) , jsonObject . toString ( ) ) ;
}
if ( replaceProcessDefinitionMap . size ( ) > 0 ) {
processDefinitionDao . updateProcessDefinitionJson ( dataSource . getConnection ( ) , replaceProcessDefinitionMap ) ;
processDefinitionDao . updateProcessDefinitionJson ( dataSource . getConnection ( ) ,
replaceProcessDefinitionMap ) ;
}
} catch ( Exception e ) {
logger . error ( "update process definition json workergroup error" , e ) ;
@ -238,7 +241,8 @@ public abstract class UpgradeDao {
Map < Integer , String > replaceProcessDefinitionMap = new HashMap < > ( ) ;
try {
Map < String , Integer > resourcesMap = resourceDao . listAllResources ( dataSource . getConnection ( ) ) ;
Map < Integer , String > processDefinitionJsonMap = processDefinitionDao . queryAllProcessDefinition ( dataSource . getConnection ( ) ) ;
Map < Integer , String > processDefinitionJsonMap =
processDefinitionDao . queryAllProcessDefinition ( dataSource . getConnection ( ) ) ;
for ( Map . Entry < Integer , String > entry : processDefinitionJsonMap . entrySet ( ) ) {
ObjectNode jsonObject = JSONUtils . parseObject ( entry . getValue ( ) ) ;
@ -249,10 +253,13 @@ public abstract class UpgradeDao {
ObjectNode param = ( ObjectNode ) task . get ( "params" ) ;
if ( param ! = null ) {
List < ResourceInfo > resourceList = JSONUtils . toList ( param . get ( "resourceList" ) . toString ( ) , ResourceInfo . class ) ;
ResourceInfo mainJar = JSONUtils . parseObject ( param . get ( "mainJar" ) . toString ( ) , ResourceInfo . class ) ;
List < ResourceInfo > resourceList =
JSONUtils . toList ( param . get ( "resourceList" ) . toString ( ) , ResourceInfo . class ) ;
ResourceInfo mainJar =
JSONUtils . parseObject ( param . get ( "mainJar" ) . toString ( ) , ResourceInfo . class ) ;
if ( mainJar ! = null & & mainJar . getId ( ) = = null ) {
String fullName = mainJar . getRes ( ) . startsWith ( "/" ) ? mainJar . getRes ( ) : String . format ( "/%s" , mainJar . getRes ( ) ) ;
String fullName = mainJar . getRes ( ) . startsWith ( "/" ) ? mainJar . getRes ( )
: String . format ( "/%s" , mainJar . getRes ( ) ) ;
if ( resourcesMap . containsKey ( fullName ) ) {
mainJar . setId ( resourcesMap . get ( fullName ) ) ;
param . put ( "mainJar" , JSONUtils . parseObject ( JSONUtils . toJsonString ( mainJar ) ) ) ;
@ -261,7 +268,8 @@ public abstract class UpgradeDao {
if ( CollectionUtils . isNotEmpty ( resourceList ) ) {
List < ResourceInfo > newResourceList = resourceList . stream ( ) . map ( resInfo - > {
String fullName = resInfo . getRes ( ) . startsWith ( "/" ) ? resInfo . getRes ( ) : String . format ( "/%s" , resInfo . getRes ( ) ) ;
String fullName = resInfo . getRes ( ) . startsWith ( "/" ) ? resInfo . getRes ( )
: String . format ( "/%s" , resInfo . getRes ( ) ) ;
if ( resInfo . getId ( ) = = null & & resourcesMap . containsKey ( fullName ) ) {
resInfo . setId ( resourcesMap . get ( fullName ) ) ;
}
@ -281,7 +289,8 @@ public abstract class UpgradeDao {
replaceProcessDefinitionMap . put ( entry . getKey ( ) , jsonObject . toString ( ) ) ;
}
if ( replaceProcessDefinitionMap . size ( ) > 0 ) {
processDefinitionDao . updateProcessDefinitionJson ( dataSource . getConnection ( ) , replaceProcessDefinitionMap ) ;
processDefinitionDao . updateProcessDefinitionJson ( dataSource . getConnection ( ) ,
replaceProcessDefinitionMap ) ;
}
} catch ( Exception e ) {
logger . error ( "update process definition json resource list error" , e ) ;
@ -291,7 +300,8 @@ public abstract class UpgradeDao {
private void upgradeDolphinSchedulerDML ( String schemaDir ) {
String schemaVersion = schemaDir . split ( "_" ) [ 0 ] ;
Resource sqlFilePath = new ClassPathResource ( String . format ( "sql/upgrade/%s/%s/dolphinscheduler_dml.sql" , schemaDir , getDbType ( ) . name ( ) . toLowerCase ( ) ) ) ;
Resource sqlFilePath = new ClassPathResource ( String . format ( "sql/upgrade/%s/%s/dolphinscheduler_dml.sql" ,
schemaDir , getDbType ( ) . name ( ) . toLowerCase ( ) ) ) ;
logger . info ( "sqlSQLFilePath: {}" , sqlFilePath ) ;
Connection conn = null ;
PreparedStatement pstmt = null ;
@ -355,7 +365,8 @@ public abstract class UpgradeDao {
* @param schemaDir schemaDir
* /
private void upgradeDolphinSchedulerDDL ( String schemaDir , String scriptFile ) {
Resource sqlFilePath = new ClassPathResource ( String . format ( "sql/upgrade/%s/%s/%s" , schemaDir , getDbType ( ) . name ( ) . toLowerCase ( ) , scriptFile ) ) ;
Resource sqlFilePath = new ClassPathResource (
String . format ( "sql/upgrade/%s/%s/%s" , schemaDir , getDbType ( ) . name ( ) . toLowerCase ( ) , scriptFile ) ) ;
Connection conn = null ;
PreparedStatement pstmt = null ;
try {
@ -423,21 +434,26 @@ public abstract class UpgradeDao {
projectDao . updateProjectCode ( dataSource . getConnection ( ) , projectIdCodeMap ) ;
// execute process definition code
List < ProcessDefinition > processDefinitions = processDefinitionDao . queryProcessDefinition ( dataSource . getConnection ( ) ) ;
processDefinitionDao . updateProcessDefinitionCode ( dataSource . getConnection ( ) , processDefinitions , projectIdCodeMap ) ;
List < ProcessDefinition > processDefinitions =
processDefinitionDao . queryProcessDefinition ( dataSource . getConnection ( ) ) ;
processDefinitionDao . updateProcessDefinitionCode ( dataSource . getConnection ( ) , processDefinitions ,
projectIdCodeMap ) ;
// execute schedule
Map < Integer , Long > allSchedule = scheduleDao . queryAllSchedule ( dataSource . getConnection ( ) ) ;
Map < Integer , Long > processIdCodeMap = processDefinitions . stream ( ) . collect ( Collectors . toMap ( ProcessDefinition : : getId , ProcessDefinition : : getCode ) ) ;
Map < Integer , Long > processIdCodeMap = processDefinitions . stream ( )
. collect ( Collectors . toMap ( ProcessDefinition : : getId , ProcessDefinition : : getCode ) ) ;
scheduleDao . updateScheduleCode ( dataSource . getConnection ( ) , allSchedule , processIdCodeMap ) ;
// json split
Map < Integer , String > processDefinitionJsonMap = processDefinitionDao . queryAllProcessDefinition ( dataSource . getConnection ( ) ) ;
Map < Integer , String > processDefinitionJsonMap =
processDefinitionDao . queryAllProcessDefinition ( dataSource . getConnection ( ) ) ;
List < ProcessDefinitionLog > processDefinitionLogs = new ArrayList < > ( ) ;
List < ProcessTaskRelationLog > processTaskRelationLogs = new ArrayList < > ( ) ;
List < TaskDefinitionLog > taskDefinitionLogs = new ArrayList < > ( ) ;
Map < Integer , Map < Long , Map < String , Long > > > processTaskMap = new HashMap < > ( ) ;
splitProcessDefinitionJson ( processDefinitions , processDefinitionJsonMap , processDefinitionLogs , processTaskRelationLogs , taskDefinitionLogs , processTaskMap ) ;
splitProcessDefinitionJson ( processDefinitions , processDefinitionJsonMap , processDefinitionLogs ,
processTaskRelationLogs , taskDefinitionLogs , processTaskMap ) ;
convertDependence ( taskDefinitionLogs , projectIdCodeMap , processTaskMap ) ;
// execute json split
@ -465,7 +481,8 @@ public abstract class UpgradeDao {
ObjectNode jsonObject = JSONUtils . parseObject ( entry . getValue ( ) ) ;
ProcessDefinition processDefinition = processDefinitionMap . get ( entry . getKey ( ) ) ;
if ( processDefinition ! = null ) {
processDefinition . setTenantId ( jsonObject . get ( "tenantId" ) = = null ? - 1 : jsonObject . get ( "tenantId" ) . asInt ( ) ) ;
processDefinition
. setTenantId ( jsonObject . get ( "tenantId" ) = = null ? - 1 : jsonObject . get ( "tenantId" ) . asInt ( ) ) ;
processDefinition . setTimeout ( jsonObject . get ( "timeout" ) . asInt ( ) ) ;
processDefinition . setGlobalParams ( jsonObject . get ( "globalParams" ) . toString ( ) ) ;
} else {
@ -485,8 +502,10 @@ public abstract class UpgradeDao {
if ( param ! = null ) {
JsonNode resourceJsonNode = param . get ( "resourceList" ) ;
if ( resourceJsonNode ! = null & & ! resourceJsonNode . isEmpty ( ) ) {
List < ResourceInfo > resourceList = JSONUtils . toList ( param . get ( "resourceList" ) . toString ( ) , ResourceInfo . class ) ;
List < Integer > resourceIds = resourceList . stream ( ) . map ( ResourceInfo : : getId ) . collect ( Collectors . toList ( ) ) ;
List < ResourceInfo > resourceList =
JSONUtils . toList ( param . get ( "resourceList" ) . toString ( ) , ResourceInfo . class ) ;
List < Integer > resourceIds =
resourceList . stream ( ) . map ( ResourceInfo : : getId ) . collect ( Collectors . toList ( ) ) ;
taskDefinitionLog . setResourceIds ( Joiner . on ( Constants . COMMA ) . join ( resourceIds ) ) ;
} else {
taskDefinitionLog . setResourceIds ( "" ) ;
@ -494,7 +513,8 @@ public abstract class UpgradeDao {
if ( TASK_TYPE_SUB_PROCESS . equals ( taskType ) ) {
JsonNode jsonNodeDefinitionId = param . get ( "processDefinitionId" ) ;
if ( jsonNodeDefinitionId ! = null ) {
param . put ( "processDefinitionCode" , processDefinitionMap . get ( jsonNodeDefinitionId . asInt ( ) ) . getCode ( ) ) ;
param . put ( "processDefinitionCode" ,
processDefinitionMap . get ( jsonNodeDefinitionId . asInt ( ) ) . getCode ( ) ) ;
param . remove ( "processDefinitionId" ) ;
}
}
@ -502,23 +522,29 @@ public abstract class UpgradeDao {
param . put ( "dependence" , task . get ( "dependence" ) ) ;
taskDefinitionLog . setTaskParams ( JSONUtils . toJsonString ( param ) ) ;
}
TaskTimeoutParameter timeout = JSONUtils . parseObject ( JSONUtils . toJsonString ( task . get ( "timeout" ) ) , TaskTimeoutParameter . class ) ;
TaskTimeoutParameter timeout =
JSONUtils . parseObject ( JSONUtils . toJsonString ( task . get ( "timeout" ) ) , TaskTimeoutParameter . class ) ;
if ( timeout ! = null ) {
taskDefinitionLog . setTimeout ( timeout . getInterval ( ) ) ;
taskDefinitionLog . setTimeoutFlag ( timeout . getEnable ( ) ? TimeoutFlag . OPEN : TimeoutFlag . CLOSE ) ;
taskDefinitionLog . setTimeoutNotifyStrategy ( timeout . getStrategy ( ) ) ;
}
String desc = task . get ( "description" ) ! = null ? task . get ( "description" ) . asText ( ) :
task . get ( "desc" ) ! = null ? task . get ( "desc" ) . asText ( ) : "" ;
String desc = task . get ( "description" ) ! = null ? task . get ( "description" ) . asText ( )
: task . get ( "desc" ) ! = null ? task . get ( "desc" ) . asText ( ) : "" ;
taskDefinitionLog . setDescription ( desc ) ;
taskDefinitionLog . setFlag ( Constants . FLOWNODE_RUN_FLAG_NORMAL . equals ( task . get ( "runFlag" ) . asText ( ) ) ? Flag . YES : Flag . NO ) ;
taskDefinitionLog . setFlag (
Constants . FLOWNODE_RUN_FLAG_NORMAL . equals ( task . get ( "runFlag" ) . asText ( ) ) ? Flag . YES : Flag . NO ) ;
taskDefinitionLog . setTaskType ( taskType ) ;
taskDefinitionLog . setFailRetryInterval ( TASK_TYPE_SUB_PROCESS . equals ( taskType ) ? 1 : task . get ( "retryInterval" ) . asInt ( ) ) ;
taskDefinitionLog . setFailRetryTimes ( TASK_TYPE_SUB_PROCESS . equals ( taskType ) ? 0 : task . get ( "maxRetryTimes" ) . asInt ( ) ) ;
taskDefinitionLog . setTaskPriority ( JSONUtils . parseObject ( JSONUtils . toJsonString ( task . get ( "taskInstancePriority" ) ) , Priority . class ) ) ;
taskDefinitionLog . setFailRetryInterval (
TASK_TYPE_SUB_PROCESS . equals ( taskType ) ? 1 : task . get ( "retryInterval" ) . asInt ( ) ) ;
taskDefinitionLog . setFailRetryTimes (
TASK_TYPE_SUB_PROCESS . equals ( taskType ) ? 0 : task . get ( "maxRetryTimes" ) . asInt ( ) ) ;
taskDefinitionLog . setTaskPriority ( JSONUtils
. parseObject ( JSONUtils . toJsonString ( task . get ( "taskInstancePriority" ) ) , Priority . class ) ) ;
String name = task . get ( "name" ) . asText ( ) ;
taskDefinitionLog . setName ( name ) ;
taskDefinitionLog . setWorkerGroup ( task . get ( "workerGroup" ) = = null ? "default" : task . get ( "workerGroup" ) . asText ( ) ) ;
taskDefinitionLog
. setWorkerGroup ( task . get ( "workerGroup" ) = = null ? "default" : task . get ( "workerGroup" ) . asText ( ) ) ;
long taskCode = CodeGenerateUtils . getInstance ( ) . genCode ( ) ;
taskDefinitionLog . setCode ( taskCode ) ;
taskDefinitionLog . setVersion ( Constants . VERSION_FIRST ) ;
@ -550,14 +576,16 @@ public abstract class UpgradeDao {
}
}
public void convertConditions ( List < TaskDefinitionLog > taskDefinitionLogList , Map < String , Long > taskNameCodeMap ) throws Exception {
public void convertConditions ( List < TaskDefinitionLog > taskDefinitionLogList ,
Map < String , Long > taskNameCodeMap ) throws Exception {
for ( TaskDefinitionLog taskDefinitionLog : taskDefinitionLogList ) {
if ( TASK_TYPE_CONDITIONS . equals ( taskDefinitionLog . getTaskType ( ) ) ) {
ObjectMapper objectMapper = new ObjectMapper ( ) ;
ObjectNode taskParams = JSONUtils . parseObject ( taskDefinitionLog . getTaskParams ( ) ) ;
// reset conditionResult
ObjectNode conditionResult = ( ObjectNode ) taskParams . get ( "conditionResult" ) ;
List < String > successNode = JSONUtils . toList ( conditionResult . get ( "successNode" ) . toString ( ) , String . class ) ;
List < String > successNode =
JSONUtils . toList ( conditionResult . get ( "successNode" ) . toString ( ) , String . class ) ;
List < Long > nodeCode = new ArrayList < > ( ) ;
successNode . forEach ( node - > nodeCode . add ( taskNameCodeMap . get ( node ) ) ) ;
conditionResult . set ( "successNode" , objectMapper . readTree ( objectMapper . writeValueAsString ( nodeCode ) ) ) ;
@ -567,10 +595,12 @@ public abstract class UpgradeDao {
conditionResult . set ( "failedNode" , objectMapper . readTree ( objectMapper . writeValueAsString ( nodeCode ) ) ) ;
// reset dependItemList
ObjectNode dependence = ( ObjectNode ) taskParams . get ( "dependence" ) ;
ArrayNode dependTaskList = JSONUtils . parseArray ( JSONUtils . toJsonString ( dependence . get ( "dependTaskList" ) ) ) ;
ArrayNode dependTaskList =
JSONUtils . parseArray ( JSONUtils . toJsonString ( dependence . get ( "dependTaskList" ) ) ) ;
for ( int i = 0 ; i < dependTaskList . size ( ) ; i + + ) {
ObjectNode dependTask = ( ObjectNode ) dependTaskList . path ( i ) ;
ArrayNode dependItemList = JSONUtils . parseArray ( JSONUtils . toJsonString ( dependTask . get ( "dependItemList" ) ) ) ;
ArrayNode dependItemList =
JSONUtils . parseArray ( JSONUtils . toJsonString ( dependTask . get ( "dependItemList" ) ) ) ;
for ( int j = 0 ; j < dependItemList . size ( ) ; j + + ) {
ObjectNode dependItem = ( ObjectNode ) dependItemList . path ( j ) ;
JsonNode depTasks = dependItem . get ( "depTasks" ) ;
@ -591,7 +621,8 @@ public abstract class UpgradeDao {
if ( Strings . isNullOrEmpty ( locations ) ) {
return locations ;
}
Map < String , ObjectNode > locationsMap = JSONUtils . parseObject ( locations , new TypeReference < Map < String , ObjectNode > > ( ) {
Map < String , ObjectNode > locationsMap =
JSONUtils . parseObject ( locations , new TypeReference < Map < String , ObjectNode > > ( ) {
} ) ;
if ( locationsMap = = null ) {
return locations ;
@ -615,26 +646,33 @@ public abstract class UpgradeDao {
if ( TASK_TYPE_DEPENDENT . equals ( taskDefinitionLog . getTaskType ( ) ) ) {
ObjectNode taskParams = JSONUtils . parseObject ( taskDefinitionLog . getTaskParams ( ) ) ;
ObjectNode dependence = ( ObjectNode ) taskParams . get ( "dependence" ) ;
ArrayNode dependTaskList = JSONUtils . parseArray ( JSONUtils . toJsonString ( dependence . get ( "dependTaskList" ) ) ) ;
ArrayNode dependTaskList =
JSONUtils . parseArray ( JSONUtils . toJsonString ( dependence . get ( "dependTaskList" ) ) ) ;
for ( int i = 0 ; i < dependTaskList . size ( ) ; i + + ) {
ObjectNode dependTask = ( ObjectNode ) dependTaskList . path ( i ) ;
ArrayNode dependItemList = JSONUtils . parseArray ( JSONUtils . toJsonString ( dependTask . get ( "dependItemList" ) ) ) ;
ArrayNode dependItemList =
JSONUtils . parseArray ( JSONUtils . toJsonString ( dependTask . get ( "dependItemList" ) ) ) ;
for ( int j = 0 ; j < dependItemList . size ( ) ; j + + ) {
ObjectNode dependItem = ( ObjectNode ) dependItemList . path ( j ) ;
dependItem . put ( "projectCode" , projectIdCodeMap . get ( dependItem . get ( "projectId" ) . asInt ( ) ) ) ;
int definitionId = dependItem . get ( "definitionId" ) . asInt ( ) ;
Map < Long , Map < String , Long > > processCodeTaskNameCodeMap = processTaskMap . get ( definitionId ) ;
if ( processCodeTaskNameCodeMap = = null ) {
logger . warn ( "We can't find processDefinition [{}], please check it is not exist, remove this dependence" , definitionId ) ;
logger . warn (
"We can't find processDefinition [{}], please check it is not exist, remove this dependence" ,
definitionId ) ;
dependItemList . remove ( j ) ;
continue ;
}
Optional < Map . Entry < Long , Map < String , Long > > > mapEntry = processCodeTaskNameCodeMap . entrySet ( ) . stream ( ) . findFirst ( ) ;
Optional < Map . Entry < Long , Map < String , Long > > > mapEntry =
processCodeTaskNameCodeMap . entrySet ( ) . stream ( ) . findFirst ( ) ;
if ( mapEntry . isPresent ( ) ) {
Map . Entry < Long , Map < String , Long > > processCodeTaskNameCodeEntry = mapEntry . get ( ) ;
dependItem . put ( "definitionCode" , processCodeTaskNameCodeEntry . getKey ( ) ) ;
String depTasks = dependItem . get ( "depTasks" ) . asText ( ) ;
long taskCode = "ALL" . equals ( depTasks ) | | processCodeTaskNameCodeEntry . getValue ( ) = = null ? 0L : processCodeTaskNameCodeEntry . getValue ( ) . get ( depTasks ) ;
long taskCode =
"ALL" . equals ( depTasks ) | | processCodeTaskNameCodeEntry . getValue ( ) = = null ? 0L
: processCodeTaskNameCodeEntry . getValue ( ) . get ( depTasks ) ;
dependItem . put ( "depTaskCode" , taskCode ) ;
}
dependItem . remove ( "projectId" ) ;