Wenjun Ruan
4 months ago
committed by
GitHub
70 changed files with 2881 additions and 1198 deletions
@ -1,164 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc; |
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull; |
||||
|
||||
import org.apache.dolphinscheduler.registry.api.ConnectionListener; |
||||
import org.apache.dolphinscheduler.registry.api.ConnectionState; |
||||
|
||||
import java.sql.SQLException; |
||||
import java.util.ArrayList; |
||||
import java.util.Collections; |
||||
import java.util.HashSet; |
||||
import java.util.List; |
||||
import java.util.Set; |
||||
import java.util.concurrent.Executors; |
||||
import java.util.concurrent.ScheduledExecutorService; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder; |
||||
|
||||
/** |
||||
* This thread is used to check the connect state to jdbc. |
||||
*/ |
||||
@Slf4j |
||||
class EphemeralDateManager implements AutoCloseable { |
||||
|
||||
private ConnectionState connectionState; |
||||
private final JdbcOperator jdbcOperator; |
||||
private final JdbcRegistryProperties registryProperties; |
||||
private final List<ConnectionListener> connectionListeners = Collections.synchronizedList(new ArrayList<>()); |
||||
private final Set<Long> ephemeralDateIds = Collections.synchronizedSet(new HashSet<>()); |
||||
private final ScheduledExecutorService scheduledExecutorService; |
||||
|
||||
EphemeralDateManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) { |
||||
this.registryProperties = registryProperties; |
||||
this.jdbcOperator = checkNotNull(jdbcOperator); |
||||
this.scheduledExecutorService = Executors.newScheduledThreadPool( |
||||
1, |
||||
new ThreadFactoryBuilder().setNameFormat("EphemeralDateTermRefreshThread").setDaemon(true).build()); |
||||
} |
||||
|
||||
public void start() { |
||||
this.scheduledExecutorService.scheduleWithFixedDelay( |
||||
new EphemeralDateTermRefreshTask(jdbcOperator, connectionListeners, ephemeralDateIds), |
||||
registryProperties.getTermRefreshInterval().toMillis(), |
||||
registryProperties.getTermRefreshInterval().toMillis(), |
||||
TimeUnit.MILLISECONDS); |
||||
} |
||||
|
||||
public void addConnectionListener(ConnectionListener connectionListener) { |
||||
connectionListeners.add(connectionListener); |
||||
} |
||||
|
||||
public long insertOrUpdateEphemeralData(String key, String value) throws SQLException { |
||||
long ephemeralId = jdbcOperator.insertOrUpdateEphemeralData(key, value); |
||||
ephemeralDateIds.add(ephemeralId); |
||||
return ephemeralId; |
||||
} |
||||
|
||||
public ConnectionState getConnectionState() { |
||||
return connectionState; |
||||
} |
||||
|
||||
@Override |
||||
public void close() throws SQLException { |
||||
for (Long ephemeralDateId : ephemeralDateIds) { |
||||
jdbcOperator.deleteDataById(ephemeralDateId); |
||||
} |
||||
ephemeralDateIds.clear(); |
||||
connectionListeners.clear(); |
||||
scheduledExecutorService.shutdownNow(); |
||||
} |
||||
|
||||
// Use this task to refresh ephemeral term and check the connect state.
|
||||
class EphemeralDateTermRefreshTask implements Runnable { |
||||
|
||||
private final List<ConnectionListener> connectionListeners; |
||||
private final Set<Long> ephemeralDateIds; |
||||
private final JdbcOperator jdbcOperator; |
||||
|
||||
private EphemeralDateTermRefreshTask(JdbcOperator jdbcOperator, |
||||
List<ConnectionListener> connectionListeners, |
||||
Set<Long> ephemeralDateIds) { |
||||
this.jdbcOperator = checkNotNull(jdbcOperator); |
||||
this.connectionListeners = checkNotNull(connectionListeners); |
||||
this.ephemeralDateIds = checkNotNull(ephemeralDateIds); |
||||
} |
||||
|
||||
@Override |
||||
public void run() { |
||||
try { |
||||
ConnectionState currentConnectionState = getConnectionState(); |
||||
if (currentConnectionState == connectionState) { |
||||
// no state change
|
||||
return; |
||||
} |
||||
|
||||
if (connectionState == ConnectionState.CONNECTED) { |
||||
if (currentConnectionState == ConnectionState.DISCONNECTED) { |
||||
connectionState = ConnectionState.DISCONNECTED; |
||||
triggerListener(ConnectionState.DISCONNECTED); |
||||
} |
||||
} else if (connectionState == ConnectionState.DISCONNECTED) { |
||||
if (currentConnectionState == ConnectionState.CONNECTED) { |
||||
connectionState = ConnectionState.CONNECTED; |
||||
triggerListener(ConnectionState.RECONNECTED); |
||||
} |
||||
} else if (connectionState == null) { |
||||
connectionState = currentConnectionState; |
||||
triggerListener(connectionState); |
||||
} |
||||
} catch (Exception e) { |
||||
log.error("Jdbc Registry connect state check task execute failed", e); |
||||
connectionState = ConnectionState.DISCONNECTED; |
||||
triggerListener(ConnectionState.DISCONNECTED); |
||||
} |
||||
} |
||||
|
||||
private ConnectionState getConnectionState() { |
||||
try { |
||||
if (ephemeralDateIds.isEmpty()) { |
||||
jdbcOperator.healthCheck(); |
||||
} else { |
||||
updateEphemeralDateTerm(); |
||||
} |
||||
jdbcOperator.clearExpireEphemeralDate(); |
||||
return ConnectionState.CONNECTED; |
||||
} catch (Exception ex) { |
||||
log.error("Get connection state error, meet an unknown exception", ex); |
||||
return ConnectionState.DISCONNECTED; |
||||
} |
||||
} |
||||
|
||||
private void updateEphemeralDateTerm() { |
||||
if (!jdbcOperator.updateEphemeralDataTerm(ephemeralDateIds)) { |
||||
log.warn("Update jdbc registry ephemeral data: {} term error", ephemeralDateIds); |
||||
} |
||||
} |
||||
|
||||
private void triggerListener(ConnectionState connectionState) { |
||||
for (ConnectionListener connectionListener : connectionListeners) { |
||||
connectionListener.onUpdate(connectionState); |
||||
} |
||||
} |
||||
} |
||||
} |
@ -1,184 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.mapper.JdbcRegistryDataMapper; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.mapper.JdbcRegistryLockMapper; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DataType; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryData; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryLock; |
||||
|
||||
import org.apache.commons.collections4.CollectionUtils; |
||||
import org.apache.commons.lang3.StringUtils; |
||||
|
||||
import java.sql.SQLException; |
||||
import java.util.Collection; |
||||
import java.util.Date; |
||||
import java.util.List; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import org.springframework.dao.DuplicateKeyException; |
||||
|
||||
public final class JdbcOperator { |
||||
|
||||
private final JdbcRegistryDataMapper jdbcRegistryDataMapper; |
||||
private final JdbcRegistryLockMapper jdbcRegistryLockMapper; |
||||
private final long expireTimeWindow; |
||||
|
||||
JdbcOperator(JdbcRegistryProperties registryProperties, |
||||
JdbcRegistryDataMapper jdbcRegistryDataMapper, |
||||
JdbcRegistryLockMapper jdbcRegistryLockMapper) { |
||||
this.expireTimeWindow = |
||||
registryProperties.getTermExpireTimes() * registryProperties.getTermRefreshInterval().toMillis(); |
||||
this.jdbcRegistryDataMapper = jdbcRegistryDataMapper; |
||||
this.jdbcRegistryLockMapper = jdbcRegistryLockMapper; |
||||
} |
||||
|
||||
public void healthCheck() { |
||||
jdbcRegistryLockMapper.countAll(); |
||||
} |
||||
|
||||
public List<JdbcRegistryData> queryAllJdbcRegistryData() { |
||||
return jdbcRegistryDataMapper.selectAll(); |
||||
} |
||||
|
||||
public Long insertOrUpdateEphemeralData(String key, String value) throws SQLException { |
||||
JdbcRegistryData jdbcRegistryData = jdbcRegistryDataMapper.selectByKey(key); |
||||
if (jdbcRegistryData != null) { |
||||
jdbcRegistryData.setDataValue(value); |
||||
jdbcRegistryData.setLastUpdateTime(new Date()); |
||||
jdbcRegistryData.setLastTerm(System.currentTimeMillis()); |
||||
if (jdbcRegistryDataMapper.updateById(jdbcRegistryData) <= 0) { |
||||
throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value)); |
||||
} |
||||
return jdbcRegistryData.getId(); |
||||
} |
||||
jdbcRegistryData = JdbcRegistryData.builder() |
||||
.dataKey(key) |
||||
.dataValue(value) |
||||
.dataType(DataType.EPHEMERAL.getTypeValue()) |
||||
.lastTerm(System.currentTimeMillis()) |
||||
.lastUpdateTime(new Date()) |
||||
.createTime(new Date()) |
||||
.build(); |
||||
jdbcRegistryDataMapper.insert(jdbcRegistryData); |
||||
return jdbcRegistryData.getId(); |
||||
} |
||||
|
||||
public long insertOrUpdatePersistentData(String key, String value) throws SQLException { |
||||
JdbcRegistryData jdbcRegistryData = jdbcRegistryDataMapper.selectByKey(key); |
||||
if (jdbcRegistryData != null) { |
||||
jdbcRegistryData.setDataValue(value); |
||||
jdbcRegistryData.setLastUpdateTime(new Date()); |
||||
jdbcRegistryData.setLastTerm(System.currentTimeMillis()); |
||||
if (jdbcRegistryDataMapper.updateById(jdbcRegistryData) <= 0) { |
||||
throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value)); |
||||
} |
||||
return jdbcRegistryData.getId(); |
||||
} |
||||
jdbcRegistryData = JdbcRegistryData.builder() |
||||
.dataKey(key) |
||||
.dataValue(value) |
||||
.dataType(DataType.PERSISTENT.getTypeValue()) |
||||
.lastTerm(System.currentTimeMillis()) |
||||
.lastUpdateTime(new Date()) |
||||
.createTime(new Date()) |
||||
.build(); |
||||
jdbcRegistryDataMapper.insert(jdbcRegistryData); |
||||
return jdbcRegistryData.getId(); |
||||
} |
||||
|
||||
public void deleteDataByKey(String key) { |
||||
jdbcRegistryDataMapper.deleteByKey(key); |
||||
} |
||||
|
||||
public void deleteDataById(long id) { |
||||
jdbcRegistryDataMapper.deleteById(id); |
||||
} |
||||
|
||||
public void clearExpireLock() { |
||||
jdbcRegistryLockMapper.clearExpireLock(System.currentTimeMillis() - expireTimeWindow); |
||||
} |
||||
|
||||
public void clearExpireEphemeralDate() { |
||||
jdbcRegistryDataMapper.clearExpireEphemeralDate(System.currentTimeMillis() - expireTimeWindow, |
||||
DataType.EPHEMERAL.getTypeValue()); |
||||
} |
||||
|
||||
public JdbcRegistryData getData(String key) throws SQLException { |
||||
return jdbcRegistryDataMapper.selectByKey(key); |
||||
} |
||||
|
||||
public List<String> getChildren(String key) throws SQLException { |
||||
return jdbcRegistryDataMapper.fuzzyQueryByKey(key) |
||||
.stream() |
||||
.map(JdbcRegistryData::getDataKey) |
||||
.filter(fullPath -> fullPath.length() > key.length()) |
||||
.map(fullPath -> StringUtils.substringBefore(fullPath.substring(key.length() + 1), "/")) |
||||
.collect(Collectors.toList()); |
||||
} |
||||
|
||||
public boolean existKey(String key) { |
||||
JdbcRegistryData jdbcRegistryData = jdbcRegistryDataMapper.selectByKey(key); |
||||
return jdbcRegistryData != null; |
||||
} |
||||
|
||||
/** |
||||
* Try to acquire the target Lock, if cannot acquire, return null. |
||||
*/ |
||||
public JdbcRegistryLock tryToAcquireLock(String key) { |
||||
JdbcRegistryLock jdbcRegistryLock = JdbcRegistryLock.builder() |
||||
.lockKey(key) |
||||
.lockOwner(LockUtils.getLockOwner()) |
||||
.lastTerm(System.currentTimeMillis()) |
||||
.lastUpdateTime(new Date()) |
||||
.build(); |
||||
try { |
||||
jdbcRegistryLockMapper.insert(jdbcRegistryLock); |
||||
return jdbcRegistryLock; |
||||
} catch (Exception e) { |
||||
if (e instanceof DuplicateKeyException) { |
||||
return null; |
||||
} |
||||
throw e; |
||||
} |
||||
} |
||||
|
||||
public JdbcRegistryLock getLockById(long lockId) { |
||||
return jdbcRegistryLockMapper.selectById(lockId); |
||||
} |
||||
|
||||
public boolean releaseLock(long lockId) throws SQLException { |
||||
return jdbcRegistryLockMapper.deleteById(lockId) > 0; |
||||
} |
||||
|
||||
public boolean updateEphemeralDataTerm(Collection<Long> ephemeralDateIds) { |
||||
if (CollectionUtils.isEmpty(ephemeralDateIds)) { |
||||
return true; |
||||
} |
||||
return jdbcRegistryDataMapper.updateTermByIds(ephemeralDateIds, System.currentTimeMillis()) > 0; |
||||
} |
||||
|
||||
public boolean updateLockTerm(List<Long> lockIds) { |
||||
if (CollectionUtils.isEmpty(lockIds)) { |
||||
return true; |
||||
} |
||||
return jdbcRegistryLockMapper.updateTermByIds(lockIds, System.currentTimeMillis()) > 0; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,31 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc; |
||||
|
||||
import org.apache.dolphinscheduler.common.thread.ThreadUtils; |
||||
|
||||
import java.util.concurrent.ScheduledExecutorService; |
||||
|
||||
public class JdbcRegistryThreadFactory { |
||||
|
||||
public static ScheduledExecutorService getDefaultSchedulerThreadExecutor() { |
||||
return ThreadUtils.newDaemonScheduledExecutorService("jdbc-registry-default-scheduler-thread-pool", |
||||
Runtime.getRuntime().availableProcessors()); |
||||
} |
||||
|
||||
} |
@ -1,161 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc; |
||||
|
||||
import org.apache.dolphinscheduler.common.thread.ThreadUtils; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryLock; |
||||
import org.apache.dolphinscheduler.registry.api.RegistryException; |
||||
|
||||
import java.sql.SQLException; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.concurrent.Executors; |
||||
import java.util.concurrent.ScheduledExecutorService; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import lombok.AccessLevel; |
||||
import lombok.RequiredArgsConstructor; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder; |
||||
|
||||
@Slf4j |
||||
class RegistryLockManager implements AutoCloseable { |
||||
|
||||
private final JdbcOperator jdbcOperator; |
||||
private final JdbcRegistryProperties registryProperties; |
||||
// lock owner -> lock
|
||||
private final Map<String, JdbcRegistryLock> lockHoldMap; |
||||
private final ScheduledExecutorService lockTermUpdateThreadPool; |
||||
|
||||
RegistryLockManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) { |
||||
this.registryProperties = registryProperties; |
||||
this.jdbcOperator = jdbcOperator; |
||||
this.lockHoldMap = new ConcurrentHashMap<>(); |
||||
this.lockTermUpdateThreadPool = Executors.newSingleThreadScheduledExecutor( |
||||
new ThreadFactoryBuilder().setNameFormat("JdbcRegistryLockTermRefreshThread").setDaemon(true).build()); |
||||
} |
||||
|
||||
public void start() { |
||||
lockTermUpdateThreadPool.scheduleWithFixedDelay( |
||||
new LockTermRefreshTask(lockHoldMap, jdbcOperator), |
||||
registryProperties.getTermRefreshInterval().toMillis(), |
||||
registryProperties.getTermRefreshInterval().toMillis(), |
||||
TimeUnit.MILLISECONDS); |
||||
} |
||||
|
||||
/** |
||||
* Acquire the lock, if cannot get the lock will await. |
||||
*/ |
||||
public void acquireLock(String lockKey) throws RegistryException { |
||||
try { |
||||
while (true) { |
||||
JdbcRegistryLock jdbcRegistryLock = lockHoldMap.get(lockKey); |
||||
if (jdbcRegistryLock != null && LockUtils.getLockOwner().equals(jdbcRegistryLock.getLockOwner())) { |
||||
return; |
||||
} |
||||
jdbcRegistryLock = jdbcOperator.tryToAcquireLock(lockKey); |
||||
if (jdbcRegistryLock != null) { |
||||
lockHoldMap.put(lockKey, jdbcRegistryLock); |
||||
return; |
||||
} |
||||
log.debug("Acquire the lock {} failed try again", lockKey); |
||||
// acquire failed, wait and try again
|
||||
ThreadUtils.sleep(JdbcRegistryConstant.LOCK_ACQUIRE_INTERVAL); |
||||
} |
||||
} catch (Exception ex) { |
||||
throw new RegistryException("Acquire the lock: " + lockKey + " error", ex); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Acquire the lock, if cannot get the lock will await. |
||||
*/ |
||||
public boolean acquireLock(String lockKey, long timeout) throws RegistryException { |
||||
long startTime = System.currentTimeMillis(); |
||||
try { |
||||
while (System.currentTimeMillis() - startTime < timeout) { |
||||
JdbcRegistryLock jdbcRegistryLock = lockHoldMap.get(lockKey); |
||||
if (jdbcRegistryLock != null && LockUtils.getLockOwner().equals(jdbcRegistryLock.getLockOwner())) { |
||||
return true; |
||||
} |
||||
jdbcRegistryLock = jdbcOperator.tryToAcquireLock(lockKey); |
||||
if (jdbcRegistryLock != null) { |
||||
lockHoldMap.put(lockKey, jdbcRegistryLock); |
||||
return true; |
||||
} |
||||
log.debug("Acquire the lock {} failed try again", lockKey); |
||||
ThreadUtils.sleep(JdbcRegistryConstant.LOCK_ACQUIRE_INTERVAL); |
||||
} |
||||
} catch (Exception e) { |
||||
throw new RegistryException("Acquire the lock: " + lockKey + " error", e); |
||||
} |
||||
return false; |
||||
} |
||||
|
||||
public void releaseLock(String lockKey) { |
||||
JdbcRegistryLock jdbcRegistryLock = lockHoldMap.get(lockKey); |
||||
if (jdbcRegistryLock != null) { |
||||
try { |
||||
// the lock is unExit
|
||||
jdbcOperator.releaseLock(jdbcRegistryLock.getId()); |
||||
lockHoldMap.remove(lockKey); |
||||
} catch (SQLException e) { |
||||
lockHoldMap.remove(lockKey); |
||||
throw new RegistryException(String.format("Release lock: %s error", lockKey), e); |
||||
} |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void close() { |
||||
lockTermUpdateThreadPool.shutdownNow(); |
||||
for (Map.Entry<String, JdbcRegistryLock> lockEntry : lockHoldMap.entrySet()) { |
||||
releaseLock(lockEntry.getKey()); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* This task is used to refresh the lock held by the current server. |
||||
*/ |
||||
@RequiredArgsConstructor(access = AccessLevel.PRIVATE) |
||||
static class LockTermRefreshTask implements Runnable { |
||||
|
||||
private final Map<String, JdbcRegistryLock> lockHoldMap; |
||||
private final JdbcOperator jdbcOperator; |
||||
|
||||
public void run() { |
||||
try { |
||||
if (lockHoldMap.isEmpty()) { |
||||
return; |
||||
} |
||||
List<Long> lockIds = lockHoldMap.values() |
||||
.stream() |
||||
.map(JdbcRegistryLock::getId) |
||||
.collect(Collectors.toList()); |
||||
if (!jdbcOperator.updateLockTerm(lockIds)) { |
||||
log.warn("Update the lock: {} term failed.", lockIds); |
||||
} |
||||
} catch (Exception e) { |
||||
log.error("Update lock term error", e); |
||||
} |
||||
} |
||||
} |
||||
} |
@ -1,150 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryData; |
||||
import org.apache.dolphinscheduler.registry.api.Event; |
||||
import org.apache.dolphinscheduler.registry.api.SubscribeListener; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.concurrent.Executors; |
||||
import java.util.concurrent.ScheduledExecutorService; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.function.Function; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import lombok.RequiredArgsConstructor; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder; |
||||
|
||||
/** |
||||
* Used to refresh if the subscribe path has been changed. |
||||
*/ |
||||
@Slf4j |
||||
class SubscribeDataManager implements AutoCloseable { |
||||
|
||||
private final JdbcOperator jdbcOperator; |
||||
private final JdbcRegistryProperties registryProperties; |
||||
private final Map<String, List<SubscribeListener>> dataSubScribeMap = new ConcurrentHashMap<>(); |
||||
private final ScheduledExecutorService dataSubscribeCheckThreadPool; |
||||
private final Map<String, JdbcRegistryData> jdbcRegistryDataMap = new ConcurrentHashMap<>(); |
||||
|
||||
SubscribeDataManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) { |
||||
this.registryProperties = registryProperties; |
||||
this.jdbcOperator = jdbcOperator; |
||||
this.dataSubscribeCheckThreadPool = Executors.newScheduledThreadPool( |
||||
1, |
||||
new ThreadFactoryBuilder().setNameFormat("JdbcRegistrySubscribeDataCheckThread").setDaemon(true) |
||||
.build()); |
||||
} |
||||
|
||||
public void start() { |
||||
dataSubscribeCheckThreadPool.scheduleWithFixedDelay( |
||||
new RegistrySubscribeDataCheckTask(dataSubScribeMap, jdbcOperator, jdbcRegistryDataMap), |
||||
registryProperties.getTermRefreshInterval().toMillis(), |
||||
registryProperties.getTermRefreshInterval().toMillis(), |
||||
TimeUnit.MILLISECONDS); |
||||
} |
||||
|
||||
public void addListener(String path, SubscribeListener subscribeListener) { |
||||
dataSubScribeMap.computeIfAbsent(path, k -> new ArrayList<>()).add(subscribeListener); |
||||
} |
||||
|
||||
public void removeListener(String path) { |
||||
dataSubScribeMap.remove(path); |
||||
} |
||||
|
||||
public JdbcRegistryData getData(String path) { |
||||
return jdbcRegistryDataMap.get(path); |
||||
} |
||||
|
||||
@Override |
||||
public void close() { |
||||
dataSubscribeCheckThreadPool.shutdownNow(); |
||||
dataSubScribeMap.clear(); |
||||
} |
||||
|
||||
@RequiredArgsConstructor |
||||
static class RegistrySubscribeDataCheckTask implements Runnable { |
||||
|
||||
private final Map<String, List<SubscribeListener>> dataSubScribeMap; |
||||
private final JdbcOperator jdbcOperator; |
||||
private final Map<String, JdbcRegistryData> jdbcRegistryDataMap; |
||||
|
||||
@Override |
||||
public void run() { |
||||
// query the full data from database, and update the jdbcRegistryDataMap
|
||||
try { |
||||
Map<String, JdbcRegistryData> currentJdbcDataMap = jdbcOperator.queryAllJdbcRegistryData() |
||||
.stream() |
||||
.collect(Collectors.toMap(JdbcRegistryData::getDataKey, Function.identity())); |
||||
// find the different
|
||||
List<JdbcRegistryData> addedData = new ArrayList<>(); |
||||
List<JdbcRegistryData> deletedData = new ArrayList<>(); |
||||
List<JdbcRegistryData> updatedData = new ArrayList<>(); |
||||
|
||||
for (Map.Entry<String, JdbcRegistryData> entry : currentJdbcDataMap.entrySet()) { |
||||
JdbcRegistryData newData = entry.getValue(); |
||||
JdbcRegistryData oldData = jdbcRegistryDataMap.get(entry.getKey()); |
||||
if (oldData == null) { |
||||
addedData.add(newData); |
||||
} else { |
||||
if (!entry.getValue().getLastUpdateTime().equals(oldData.getLastUpdateTime())) { |
||||
updatedData.add(newData); |
||||
} |
||||
} |
||||
} |
||||
|
||||
for (Map.Entry<String, JdbcRegistryData> entry : jdbcRegistryDataMap.entrySet()) { |
||||
if (!currentJdbcDataMap.containsKey(entry.getKey())) { |
||||
deletedData.add(entry.getValue()); |
||||
} |
||||
} |
||||
jdbcRegistryDataMap.clear(); |
||||
jdbcRegistryDataMap.putAll(currentJdbcDataMap); |
||||
// trigger listener
|
||||
for (Map.Entry<String, List<SubscribeListener>> entry : dataSubScribeMap.entrySet()) { |
||||
String subscribeKey = entry.getKey(); |
||||
List<SubscribeListener> subscribeListeners = entry.getValue(); |
||||
triggerListener(addedData, subscribeKey, subscribeListeners, Event.Type.ADD); |
||||
triggerListener(deletedData, subscribeKey, subscribeListeners, Event.Type.REMOVE); |
||||
triggerListener(updatedData, subscribeKey, subscribeListeners, Event.Type.UPDATE); |
||||
} |
||||
} catch (Exception e) { |
||||
log.error("Query data from jdbc registry error"); |
||||
} |
||||
} |
||||
|
||||
private void triggerListener(List<JdbcRegistryData> dataList, |
||||
String subscribeKey, |
||||
List<SubscribeListener> subscribeListeners, |
||||
Event.Type type) { |
||||
for (JdbcRegistryData data : dataList) { |
||||
if (data.getDataKey().startsWith(subscribeKey)) { |
||||
subscribeListeners.forEach(subscribeListener -> subscribeListener |
||||
.notify(new Event(data.getDataKey(), data.getDataKey(), data.getDataValue(), type))); |
||||
} |
||||
} |
||||
} |
||||
|
||||
} |
||||
} |
@ -0,0 +1,106 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.client; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.DataType; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryDataDTO; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.server.ConnectionStateListener; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.server.JdbcRegistryDataChangeListener; |
||||
|
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
|
||||
public interface IJdbcRegistryClient extends AutoCloseable { |
||||
|
||||
/** |
||||
* Start the jdbc registry client, once started, the client will connect to the jdbc registry server, and then it can be used. |
||||
*/ |
||||
void start(); |
||||
|
||||
/** |
||||
* Get identify of the client. |
||||
*/ |
||||
JdbcRegistryClientIdentify getJdbcRegistryClientIdentify(); |
||||
|
||||
/** |
||||
* Check the connectivity of the client. |
||||
*/ |
||||
boolean isConnectivity(); |
||||
|
||||
/** |
||||
* Subscribe the jdbc registry connection state change event. |
||||
*/ |
||||
void subscribeConnectionStateChange(ConnectionStateListener connectionStateListener); |
||||
|
||||
/** |
||||
* Subscribe the {@link JdbcRegistryDataDTO} change event. |
||||
*/ |
||||
void subscribeJdbcRegistryDataChange(JdbcRegistryDataChangeListener jdbcRegistryDataChangeListener); |
||||
|
||||
/** |
||||
* Get the {@link JdbcRegistryDataDTO} by key. |
||||
*/ |
||||
Optional<JdbcRegistryDataDTO> getJdbcRegistryDataByKey(String key); |
||||
|
||||
/** |
||||
* Put the {@link JdbcRegistryDataDTO} to the jdbc registry server. |
||||
* <p> |
||||
* If the key is already exist, then update the {@link JdbcRegistryDataDTO}. If the key is not exist, then insert a new {@link JdbcRegistryDataDTO}. |
||||
*/ |
||||
void putJdbcRegistryData(String key, String value, DataType dataType); |
||||
|
||||
/** |
||||
* Delete the {@link JdbcRegistryDataDTO} by key. |
||||
*/ |
||||
void deleteJdbcRegistryDataByKey(String key); |
||||
|
||||
/** |
||||
* List all the {@link JdbcRegistryDataDTO} children by key. |
||||
* <p> |
||||
* e.g. key = "/dolphinscheduler/master", and data exist in db is "/dolphinscheduler/master/master1", "/dolphinscheduler/master/master2" |
||||
* <p> |
||||
* then the return value will be ["master1", "master2"] |
||||
*/ |
||||
List<JdbcRegistryDataDTO> listJdbcRegistryDataChildren(String key); |
||||
|
||||
/** |
||||
* Check the key exist in the jdbc registry server. |
||||
*/ |
||||
boolean existJdbcRegistryDataKey(String key); |
||||
|
||||
/** |
||||
* Acquire the jdbc registry lock by key. this is a blocking method. if you want to stop the blocking, you can use interrupt the thread. |
||||
*/ |
||||
void acquireJdbcRegistryLock(String lockKey) throws IllegalArgumentException; |
||||
|
||||
/** |
||||
* Acquire the jdbc registry lock by key until timeout. |
||||
*/ |
||||
boolean acquireJdbcRegistryLock(String lockKey, long timeout); |
||||
|
||||
/** |
||||
* Release the jdbc registry lock by key, if the lockKey is not exist will do nothing. |
||||
*/ |
||||
void releaseJdbcRegistryLock(String lockKey); |
||||
|
||||
/** |
||||
* Close the jdbc registry client, once the client been closed, it cannot work anymore. |
||||
*/ |
||||
@Override |
||||
void close(); |
||||
} |
@ -0,0 +1,128 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.client; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; |
||||
import org.apache.dolphinscheduler.common.utils.NetUtils; |
||||
import org.apache.dolphinscheduler.common.utils.OSUtils; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.DataType; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryDataDTO; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.server.ConnectionStateListener; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.server.IJdbcRegistryServer; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.server.JdbcRegistryDataChangeListener; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.server.JdbcRegistryServerState; |
||||
|
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
/** |
||||
* The client of jdbc registry, used to interact with the {@link org.apache.dolphinscheduler.plugin.registry.jdbc.server.JdbcRegistryServer}. |
||||
*/ |
||||
@Slf4j |
||||
public class JdbcRegistryClient implements IJdbcRegistryClient { |
||||
|
||||
private static final String DEFAULT_CLIENT_NAME = NetUtils.getHost() + "_" + OSUtils.getProcessID(); |
||||
|
||||
private final JdbcRegistryProperties jdbcRegistryProperties; |
||||
|
||||
private final JdbcRegistryClientIdentify jdbcRegistryClientIdentify; |
||||
|
||||
private final IJdbcRegistryServer jdbcRegistryServer; |
||||
|
||||
public JdbcRegistryClient(JdbcRegistryProperties jdbcRegistryProperties, IJdbcRegistryServer jdbcRegistryServer) { |
||||
this.jdbcRegistryProperties = jdbcRegistryProperties; |
||||
this.jdbcRegistryServer = jdbcRegistryServer; |
||||
this.jdbcRegistryClientIdentify = |
||||
new JdbcRegistryClientIdentify(CodeGenerateUtils.genCode(), DEFAULT_CLIENT_NAME); |
||||
} |
||||
|
||||
@Override |
||||
public void start() { |
||||
jdbcRegistryServer.registerClient(this); |
||||
} |
||||
|
||||
@Override |
||||
public JdbcRegistryClientIdentify getJdbcRegistryClientIdentify() { |
||||
return jdbcRegistryClientIdentify; |
||||
} |
||||
|
||||
@Override |
||||
public void subscribeConnectionStateChange(ConnectionStateListener connectionStateListener) { |
||||
jdbcRegistryServer.subscribeConnectionStateChange(connectionStateListener); |
||||
} |
||||
|
||||
@Override |
||||
public void subscribeJdbcRegistryDataChange(JdbcRegistryDataChangeListener jdbcRegistryDataChangeListener) { |
||||
jdbcRegistryServer.subscribeJdbcRegistryDataChange(jdbcRegistryDataChangeListener); |
||||
} |
||||
|
||||
@Override |
||||
public Optional<JdbcRegistryDataDTO> getJdbcRegistryDataByKey(String key) { |
||||
return jdbcRegistryServer.getJdbcRegistryDataByKey(key); |
||||
} |
||||
|
||||
@Override |
||||
public void putJdbcRegistryData(String key, String value, DataType dataType) { |
||||
jdbcRegistryServer.putJdbcRegistryData(jdbcRegistryClientIdentify.getClientId(), key, value, dataType); |
||||
} |
||||
|
||||
@Override |
||||
public void deleteJdbcRegistryDataByKey(String key) { |
||||
jdbcRegistryServer.deleteJdbcRegistryDataByKey(key); |
||||
} |
||||
|
||||
@Override |
||||
public List<JdbcRegistryDataDTO> listJdbcRegistryDataChildren(String key) { |
||||
return jdbcRegistryServer.listJdbcRegistryDataChildren(key); |
||||
} |
||||
|
||||
@Override |
||||
public boolean existJdbcRegistryDataKey(String key) { |
||||
return jdbcRegistryServer.existJdbcRegistryDataKey(key); |
||||
} |
||||
|
||||
@Override |
||||
public void acquireJdbcRegistryLock(String key) { |
||||
jdbcRegistryServer.acquireJdbcRegistryLock(jdbcRegistryClientIdentify.getClientId(), key); |
||||
} |
||||
|
||||
@Override |
||||
public boolean acquireJdbcRegistryLock(String key, long timeout) { |
||||
return jdbcRegistryServer.acquireJdbcRegistryLock(jdbcRegistryClientIdentify.getClientId(), key, timeout); |
||||
} |
||||
|
||||
@Override |
||||
public void releaseJdbcRegistryLock(String key) { |
||||
jdbcRegistryServer.releaseJdbcRegistryLock(jdbcRegistryClientIdentify.getClientId(), key); |
||||
} |
||||
|
||||
@Override |
||||
public void close() { |
||||
jdbcRegistryServer.deregisterClient(this); |
||||
log.info("Closed JdbcRegistryClient: {}", jdbcRegistryClientIdentify); |
||||
} |
||||
|
||||
@Override |
||||
public boolean isConnectivity() { |
||||
return jdbcRegistryServer.getServerState() == JdbcRegistryServerState.STARTED; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,33 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.client; |
||||
|
||||
import lombok.AllArgsConstructor; |
||||
import lombok.Getter; |
||||
import lombok.ToString; |
||||
|
||||
@ToString |
||||
@Getter |
||||
@AllArgsConstructor |
||||
public class JdbcRegistryClientIdentify { |
||||
|
||||
private final Long clientId; |
||||
|
||||
private final String clientName; |
||||
|
||||
} |
@ -0,0 +1,33 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.mapper; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DO.JdbcRegistryClientHeartbeat; |
||||
|
||||
import org.apache.ibatis.annotations.Select; |
||||
|
||||
import java.util.List; |
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||
|
||||
public interface JdbcRegistryClientHeartbeatMapper extends BaseMapper<JdbcRegistryClientHeartbeat> { |
||||
|
||||
@Select("select * from t_ds_jdbc_registry_client_heartbeat") |
||||
List<JdbcRegistryClientHeartbeat> selectAll(); |
||||
|
||||
} |
@ -0,0 +1,41 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.mapper; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DO.JdbcRegistryDataChanceEvent; |
||||
|
||||
import org.apache.ibatis.annotations.Delete; |
||||
import org.apache.ibatis.annotations.Param; |
||||
import org.apache.ibatis.annotations.Select; |
||||
|
||||
import java.util.Date; |
||||
import java.util.List; |
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||
|
||||
public interface JdbcRegistryDataChanceEventMapper extends BaseMapper<JdbcRegistryDataChanceEvent> { |
||||
|
||||
@Select("select max(id) from t_ds_jdbc_registry_data_change_event") |
||||
Long getMaxId(); |
||||
|
||||
@Select("select * from t_ds_jdbc_registry_data_change_event where id > #{id} order by id asc limit 1000") |
||||
List<JdbcRegistryDataChanceEvent> selectJdbcRegistryDataChangeEventWhereIdAfter(@Param("id") long id); |
||||
|
||||
@Delete("delete from t_ds_jdbc_registry_data_change_event where create_time > #{createTime}") |
||||
void deleteJdbcRegistryDataChangeEventBeforeCreateTime(@Param("createTime") Date createTime); |
||||
} |
@ -0,0 +1,45 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.model.DO; |
||||
|
||||
import java.util.Date; |
||||
|
||||
import lombok.AllArgsConstructor; |
||||
import lombok.Builder; |
||||
import lombok.Data; |
||||
import lombok.NoArgsConstructor; |
||||
|
||||
import com.baomidou.mybatisplus.annotation.IdType; |
||||
import com.baomidou.mybatisplus.annotation.TableId; |
||||
import com.baomidou.mybatisplus.annotation.TableName; |
||||
|
||||
@TableName(value = "t_ds_jdbc_registry_client_heartbeat") |
||||
@Data |
||||
@Builder |
||||
@NoArgsConstructor |
||||
@AllArgsConstructor |
||||
public class JdbcRegistryClientHeartbeat { |
||||
|
||||
@TableId(value = "id", type = IdType.INPUT) |
||||
private Long id; |
||||
private String clientName; |
||||
private Long lastHeartbeatTime; |
||||
private String connectionConfig; |
||||
private Date createTime; |
||||
|
||||
} |
6
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/JdbcRegistryData.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/DO/JdbcRegistryData.java
6
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/JdbcRegistryData.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/DO/JdbcRegistryData.java
@ -0,0 +1,47 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.model.DO; |
||||
|
||||
import java.util.Date; |
||||
|
||||
import lombok.AllArgsConstructor; |
||||
import lombok.Builder; |
||||
import lombok.Data; |
||||
import lombok.NoArgsConstructor; |
||||
|
||||
import com.baomidou.mybatisplus.annotation.IdType; |
||||
import com.baomidou.mybatisplus.annotation.TableId; |
||||
import com.baomidou.mybatisplus.annotation.TableName; |
||||
|
||||
@TableName(value = "t_ds_jdbc_registry_data_change_event") |
||||
@Data |
||||
@Builder |
||||
@NoArgsConstructor |
||||
@AllArgsConstructor |
||||
public class JdbcRegistryDataChanceEvent { |
||||
|
||||
@TableId(value = "id", type = IdType.AUTO) |
||||
private Long id; |
||||
|
||||
private String eventType; |
||||
|
||||
private String jdbcRegistryData; |
||||
|
||||
private Date createTime; |
||||
|
||||
} |
20
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/JdbcRegistryLock.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/DO/JdbcRegistryLock.java
20
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/JdbcRegistryLock.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/DO/JdbcRegistryLock.java
15
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/DataType.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/DTO/DataType.java
15
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/DataType.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/DTO/DataType.java
@ -0,0 +1,97 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DO.JdbcRegistryClientHeartbeat; |
||||
|
||||
import java.util.Date; |
||||
|
||||
import lombok.AllArgsConstructor; |
||||
import lombok.Builder; |
||||
import lombok.Data; |
||||
import lombok.NoArgsConstructor; |
||||
import lombok.SneakyThrows; |
||||
|
||||
@Data |
||||
@Builder |
||||
@NoArgsConstructor |
||||
@AllArgsConstructor |
||||
public class JdbcRegistryClientHeartbeatDTO { |
||||
|
||||
private Long id; |
||||
|
||||
// clientName
|
||||
private String clientName; |
||||
|
||||
private Long lastHeartbeatTime; |
||||
|
||||
private ClientConfig clientConfig; |
||||
|
||||
private Date createTime; |
||||
|
||||
public static JdbcRegistryClientHeartbeatDTO fromJdbcRegistryClientHeartbeat(JdbcRegistryClientHeartbeat jdbcRegistryClientHeartbeat) { |
||||
return JdbcRegistryClientHeartbeatDTO.builder() |
||||
.id(jdbcRegistryClientHeartbeat.getId()) |
||||
.clientName(jdbcRegistryClientHeartbeat.getClientName()) |
||||
.lastHeartbeatTime(jdbcRegistryClientHeartbeat.getLastHeartbeatTime()) |
||||
.clientConfig( |
||||
JSONUtils.parseObject(jdbcRegistryClientHeartbeat.getConnectionConfig(), ClientConfig.class)) |
||||
.createTime(jdbcRegistryClientHeartbeat.getCreateTime()) |
||||
.build(); |
||||
} |
||||
|
||||
public static JdbcRegistryClientHeartbeat toJdbcRegistryClientHeartbeat(JdbcRegistryClientHeartbeatDTO jdbcRegistryClientHeartbeatDTO) { |
||||
return JdbcRegistryClientHeartbeat.builder() |
||||
.id(jdbcRegistryClientHeartbeatDTO.getId()) |
||||
.clientName(jdbcRegistryClientHeartbeatDTO.getClientName()) |
||||
.lastHeartbeatTime(jdbcRegistryClientHeartbeatDTO.getLastHeartbeatTime()) |
||||
.connectionConfig(JSONUtils.toJsonString(jdbcRegistryClientHeartbeatDTO.getClientConfig())) |
||||
.createTime(jdbcRegistryClientHeartbeatDTO.getCreateTime()) |
||||
.build(); |
||||
} |
||||
|
||||
public boolean isDead() { |
||||
// check if the client connection is expired.
|
||||
return System.currentTimeMillis() - lastHeartbeatTime > clientConfig.getSessionTimeout(); |
||||
} |
||||
|
||||
@SneakyThrows |
||||
@Override |
||||
public JdbcRegistryClientHeartbeatDTO clone() { |
||||
return JdbcRegistryClientHeartbeatDTO.builder() |
||||
.id(id) |
||||
.clientName(clientName) |
||||
.lastHeartbeatTime(lastHeartbeatTime) |
||||
.clientConfig(clientConfig) |
||||
.createTime(createTime) |
||||
.build(); |
||||
} |
||||
|
||||
@Data |
||||
@Builder |
||||
@NoArgsConstructor |
||||
@AllArgsConstructor |
||||
public static class ClientConfig { |
||||
|
||||
@Builder.Default |
||||
private long sessionTimeout = 60 * 1000L; |
||||
|
||||
} |
||||
|
||||
} |
@ -0,0 +1,76 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DO.JdbcRegistryData; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DO.JdbcRegistryDataChanceEvent; |
||||
|
||||
import java.util.Date; |
||||
|
||||
import lombok.AllArgsConstructor; |
||||
import lombok.Builder; |
||||
import lombok.Data; |
||||
import lombok.NoArgsConstructor; |
||||
|
||||
@Data |
||||
@Builder |
||||
@NoArgsConstructor |
||||
@AllArgsConstructor |
||||
public class JdbcRegistryDataChanceEventDTO { |
||||
|
||||
private Long id; |
||||
|
||||
private EventType eventType; |
||||
|
||||
private JdbcRegistryDataDTO jdbcRegistryData; |
||||
|
||||
private Date createTime; |
||||
|
||||
public enum EventType { |
||||
ADD, |
||||
UPDATE, |
||||
DELETE; |
||||
|
||||
} |
||||
|
||||
public static JdbcRegistryDataChanceEventDTO fromJdbcRegistryDataChanceEvent(JdbcRegistryDataChanceEvent jdbcRegistryDataChanceEvent) { |
||||
JdbcRegistryData jdbcRegistryData = |
||||
JSONUtils.parseObject(jdbcRegistryDataChanceEvent.getJdbcRegistryData(), JdbcRegistryData.class); |
||||
if (jdbcRegistryData == null) { |
||||
throw new IllegalArgumentException( |
||||
"jdbcRegistryData: " + jdbcRegistryDataChanceEvent.getJdbcRegistryData() + " is invalidated"); |
||||
} |
||||
return JdbcRegistryDataChanceEventDTO.builder() |
||||
.id(jdbcRegistryDataChanceEvent.getId()) |
||||
.jdbcRegistryData(JdbcRegistryDataDTO.fromJdbcRegistryData(jdbcRegistryData)) |
||||
.eventType(EventType.valueOf(jdbcRegistryDataChanceEvent.getEventType())) |
||||
.createTime(jdbcRegistryDataChanceEvent.getCreateTime()) |
||||
.build(); |
||||
} |
||||
|
||||
public static JdbcRegistryDataChanceEvent toJdbcRegistryDataChanceEvent(JdbcRegistryDataChanceEventDTO jdbcRegistryDataChanceEvent) { |
||||
return JdbcRegistryDataChanceEvent.builder() |
||||
.id(jdbcRegistryDataChanceEvent.getId()) |
||||
.jdbcRegistryData(JSONUtils.toJsonString(jdbcRegistryDataChanceEvent.getJdbcRegistryData())) |
||||
.eventType(jdbcRegistryDataChanceEvent.getEventType().name()) |
||||
.createTime(jdbcRegistryDataChanceEvent.getCreateTime()) |
||||
.build(); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,67 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DO.JdbcRegistryData; |
||||
|
||||
import java.util.Date; |
||||
|
||||
import lombok.AllArgsConstructor; |
||||
import lombok.Builder; |
||||
import lombok.Data; |
||||
import lombok.NoArgsConstructor; |
||||
|
||||
@Data |
||||
@Builder |
||||
@NoArgsConstructor |
||||
@AllArgsConstructor |
||||
public class JdbcRegistryDataDTO { |
||||
|
||||
private Long id; |
||||
private String dataKey; |
||||
private String dataValue; |
||||
private String dataType; |
||||
private Long clientId; |
||||
private Date createTime; |
||||
private Date lastUpdateTime; |
||||
|
||||
public static JdbcRegistryDataDTO fromJdbcRegistryData(JdbcRegistryData jdbcRegistryData) { |
||||
return JdbcRegistryDataDTO.builder() |
||||
.id(jdbcRegistryData.getId()) |
||||
.dataKey(jdbcRegistryData.getDataKey()) |
||||
.dataValue(jdbcRegistryData.getDataValue()) |
||||
.dataType(jdbcRegistryData.getDataType()) |
||||
.clientId(jdbcRegistryData.getClientId()) |
||||
.createTime(jdbcRegistryData.getCreateTime()) |
||||
.lastUpdateTime(jdbcRegistryData.getLastUpdateTime()) |
||||
.build(); |
||||
} |
||||
|
||||
public static JdbcRegistryData toJdbcRegistryData(JdbcRegistryDataDTO jdbcRegistryData) { |
||||
return JdbcRegistryData.builder() |
||||
.id(jdbcRegistryData.getId()) |
||||
.dataKey(jdbcRegistryData.getDataKey()) |
||||
.dataValue(jdbcRegistryData.getDataValue()) |
||||
.dataType(jdbcRegistryData.getDataType()) |
||||
.clientId(jdbcRegistryData.getClientId()) |
||||
.createTime(jdbcRegistryData.getCreateTime()) |
||||
.lastUpdateTime(jdbcRegistryData.getLastUpdateTime()) |
||||
.build(); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,61 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DO.JdbcRegistryLock; |
||||
|
||||
import java.util.Date; |
||||
|
||||
import lombok.AllArgsConstructor; |
||||
import lombok.Builder; |
||||
import lombok.Data; |
||||
import lombok.NoArgsConstructor; |
||||
|
||||
@Data |
||||
@Builder |
||||
@NoArgsConstructor |
||||
@AllArgsConstructor |
||||
public class JdbcRegistryLockDTO { |
||||
|
||||
private Long id; |
||||
private String lockKey; |
||||
private String lockOwner; |
||||
private Long clientId; |
||||
private Date createTime; |
||||
|
||||
public static JdbcRegistryLockDTO fromJdbcRegistryLock(JdbcRegistryLock jdbcRegistryLock) { |
||||
return JdbcRegistryLockDTO.builder() |
||||
.id(jdbcRegistryLock.getId()) |
||||
.lockKey(jdbcRegistryLock.getLockKey()) |
||||
.lockOwner(jdbcRegistryLock.getLockOwner()) |
||||
.clientId(jdbcRegistryLock.getClientId()) |
||||
.createTime(jdbcRegistryLock.getCreateTime()) |
||||
.build(); |
||||
} |
||||
|
||||
public static JdbcRegistryLock toJdbcRegistryLock(JdbcRegistryLockDTO jdbcRegistryLock) { |
||||
return JdbcRegistryLock.builder() |
||||
.id(jdbcRegistryLock.getId()) |
||||
.lockKey(jdbcRegistryLock.getLockKey()) |
||||
.lockOwner(jdbcRegistryLock.getLockOwner()) |
||||
.clientId(jdbcRegistryLock.getClientId()) |
||||
.createTime(jdbcRegistryLock.getCreateTime()) |
||||
.build(); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,67 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.repository; |
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.mapper.JdbcRegistryClientHeartbeatMapper; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DO.JdbcRegistryClientHeartbeat; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryClientHeartbeatDTO; |
||||
|
||||
import org.apache.commons.collections4.CollectionUtils; |
||||
|
||||
import java.util.List; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Repository; |
||||
|
||||
@Repository |
||||
public class JdbcRegistryClientRepository { |
||||
|
||||
@Autowired |
||||
private JdbcRegistryClientHeartbeatMapper jdbcRegistryClientHeartbeatMapper; |
||||
|
||||
public List<JdbcRegistryClientHeartbeatDTO> queryAll() { |
||||
return jdbcRegistryClientHeartbeatMapper.selectAll() |
||||
.stream() |
||||
.map(JdbcRegistryClientHeartbeatDTO::fromJdbcRegistryClientHeartbeat) |
||||
.collect(Collectors.toList()); |
||||
} |
||||
|
||||
public void deleteByIds(List<Long> clientIds) { |
||||
if (CollectionUtils.isEmpty(clientIds)) { |
||||
return; |
||||
} |
||||
jdbcRegistryClientHeartbeatMapper.deleteBatchIds(clientIds); |
||||
} |
||||
|
||||
public boolean updateById(JdbcRegistryClientHeartbeatDTO jdbcRegistryClientHeartbeatDTO) { |
||||
JdbcRegistryClientHeartbeat jdbcRegistryClientHeartbeat = |
||||
JdbcRegistryClientHeartbeatDTO.toJdbcRegistryClientHeartbeat(jdbcRegistryClientHeartbeatDTO); |
||||
return jdbcRegistryClientHeartbeatMapper.updateById(jdbcRegistryClientHeartbeat) == 1; |
||||
} |
||||
|
||||
public void insert(JdbcRegistryClientHeartbeatDTO jdbcRegistryClient) { |
||||
checkNotNull(jdbcRegistryClient.getId()); |
||||
JdbcRegistryClientHeartbeat jdbcRegistryClientHeartbeat = |
||||
JdbcRegistryClientHeartbeatDTO.toJdbcRegistryClientHeartbeat(jdbcRegistryClient); |
||||
jdbcRegistryClientHeartbeatMapper.insert(jdbcRegistryClientHeartbeat); |
||||
|
||||
} |
||||
} |
@ -0,0 +1,65 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.repository; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.mapper.JdbcRegistryDataChanceEventMapper; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DO.JdbcRegistryDataChanceEvent; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryDataChanceEventDTO; |
||||
|
||||
import java.util.Date; |
||||
import java.util.List; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import org.springframework.stereotype.Repository; |
||||
|
||||
@Repository |
||||
public class JdbcRegistryDataChanceEventRepository { |
||||
|
||||
private final JdbcRegistryDataChanceEventMapper jdbcRegistryDataChanceEventMapper; |
||||
|
||||
public JdbcRegistryDataChanceEventRepository(JdbcRegistryDataChanceEventMapper jdbcRegistryDataChanceEventMapper) { |
||||
this.jdbcRegistryDataChanceEventMapper = jdbcRegistryDataChanceEventMapper; |
||||
} |
||||
|
||||
public long getMaxJdbcRegistryDataChanceEventId() { |
||||
Long maxId = jdbcRegistryDataChanceEventMapper.getMaxId(); |
||||
if (maxId == null) { |
||||
return -1; |
||||
} else { |
||||
return maxId; |
||||
} |
||||
} |
||||
|
||||
public List<JdbcRegistryDataChanceEventDTO> selectJdbcRegistryDataChangeEventWhereIdAfter(long id) { |
||||
return jdbcRegistryDataChanceEventMapper.selectJdbcRegistryDataChangeEventWhereIdAfter(id) |
||||
.stream() |
||||
.map(JdbcRegistryDataChanceEventDTO::fromJdbcRegistryDataChanceEvent) |
||||
.collect(Collectors.toList()); |
||||
} |
||||
|
||||
public void insert(JdbcRegistryDataChanceEventDTO registryDataChanceEvent) { |
||||
JdbcRegistryDataChanceEvent jdbcRegistryDataChanceEvent = |
||||
JdbcRegistryDataChanceEventDTO.toJdbcRegistryDataChanceEvent(registryDataChanceEvent); |
||||
jdbcRegistryDataChanceEventMapper.insert(jdbcRegistryDataChanceEvent); |
||||
registryDataChanceEvent.setId(jdbcRegistryDataChanceEvent.getId()); |
||||
} |
||||
|
||||
public void deleteJdbcRegistryDataChangeEventBeforeCreateTime(Date createTime) { |
||||
jdbcRegistryDataChanceEventMapper.deleteJdbcRegistryDataChangeEventBeforeCreateTime(createTime); |
||||
} |
||||
} |
@ -0,0 +1,73 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.repository; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.mapper.JdbcRegistryDataMapper; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DO.JdbcRegistryData; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.DataType; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryDataDTO; |
||||
|
||||
import org.apache.commons.collections4.CollectionUtils; |
||||
|
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Repository; |
||||
|
||||
@Repository |
||||
public class JdbcRegistryDataRepository { |
||||
|
||||
@Autowired |
||||
private JdbcRegistryDataMapper jdbcRegistryDataMapper; |
||||
|
||||
public List<JdbcRegistryDataDTO> selectAll() { |
||||
return jdbcRegistryDataMapper |
||||
.selectAll() |
||||
.stream() |
||||
.map(JdbcRegistryDataDTO::fromJdbcRegistryData) |
||||
.collect(Collectors.toList()); |
||||
} |
||||
|
||||
public Optional<JdbcRegistryDataDTO> selectByKey(String key) { |
||||
return Optional.ofNullable(jdbcRegistryDataMapper.selectByKey(key)) |
||||
.map(JdbcRegistryDataDTO::fromJdbcRegistryData); |
||||
} |
||||
|
||||
public void deleteEphemeralDateByClientIds(List<Long> clientIds) { |
||||
if (CollectionUtils.isEmpty(clientIds)) { |
||||
return; |
||||
} |
||||
jdbcRegistryDataMapper.deleteByClientIds(clientIds, DataType.EPHEMERAL.name()); |
||||
} |
||||
|
||||
public void deleteByKey(String key) { |
||||
jdbcRegistryDataMapper.deleteByKey(key); |
||||
} |
||||
|
||||
public void insert(JdbcRegistryDataDTO jdbcRegistryData) { |
||||
JdbcRegistryData jdbcRegistryDataDO = JdbcRegistryDataDTO.toJdbcRegistryData(jdbcRegistryData); |
||||
jdbcRegistryDataMapper.insert(jdbcRegistryDataDO); |
||||
jdbcRegistryData.setId(jdbcRegistryDataDO.getId()); |
||||
} |
||||
|
||||
public void updateById(JdbcRegistryDataDTO jdbcRegistryDataDTO) { |
||||
jdbcRegistryDataMapper.updateById(JdbcRegistryDataDTO.toJdbcRegistryData(jdbcRegistryDataDTO)); |
||||
} |
||||
} |
@ -0,0 +1,56 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.repository; |
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.mapper.JdbcRegistryLockMapper; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DO.JdbcRegistryLock; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryLockDTO; |
||||
|
||||
import org.apache.commons.collections4.CollectionUtils; |
||||
|
||||
import java.util.List; |
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Repository; |
||||
|
||||
@Repository |
||||
public class JdbcRegistryLockRepository { |
||||
|
||||
@Autowired |
||||
private JdbcRegistryLockMapper jdbcRegistryLockMapper; |
||||
|
||||
public void deleteByClientIds(List<Long> clientIds) { |
||||
if (CollectionUtils.isEmpty(clientIds)) { |
||||
return; |
||||
} |
||||
jdbcRegistryLockMapper.deleteByClientIds(clientIds); |
||||
} |
||||
|
||||
public void insert(JdbcRegistryLockDTO jdbcRegistryLock) { |
||||
checkNotNull(jdbcRegistryLock); |
||||
JdbcRegistryLock jdbcRegistryLockDO = JdbcRegistryLockDTO.toJdbcRegistryLock(jdbcRegistryLock); |
||||
jdbcRegistryLockMapper.insert(jdbcRegistryLockDO); |
||||
jdbcRegistryLock.setId(jdbcRegistryLockDO.getId()); |
||||
} |
||||
|
||||
public void deleteById(Long id) { |
||||
jdbcRegistryLockMapper.deleteById(id); |
||||
} |
||||
} |
10
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConstant.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/ConnectionStateListener.java
10
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConstant.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/ConnectionStateListener.java
@ -0,0 +1,55 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.server; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.DataType; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryDataDTO; |
||||
|
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
|
||||
public interface IJdbcRegistryDataManager { |
||||
|
||||
boolean existKey(String key); |
||||
|
||||
/** |
||||
* Get the {@link JdbcRegistryDataDTO} by key. |
||||
*/ |
||||
Optional<JdbcRegistryDataDTO> getRegistryDataByKey(String key); |
||||
|
||||
/** |
||||
* List all the {@link JdbcRegistryDataDTO} children by key. |
||||
* <p> |
||||
* e.g. key = "/dolphinscheduler/master", and data exist in db is "/dolphinscheduler/master/master1", "/dolphinscheduler/master/master2" |
||||
* <p> |
||||
* then the return value will be ["master1", "master2"] |
||||
*/ |
||||
List<JdbcRegistryDataDTO> listJdbcRegistryDataChildren(String key); |
||||
|
||||
/** |
||||
* Put the {@link JdbcRegistryDataDTO} to the jdbc registry server. |
||||
* <p> |
||||
* If the key is already exist, then update the {@link JdbcRegistryDataDTO}. If the key is not exist, then insert a new {@link JdbcRegistryDataDTO}. |
||||
*/ |
||||
void putJdbcRegistryData(Long clientId, String key, String value, DataType dataType); |
||||
|
||||
/** |
||||
* Delete the {@link JdbcRegistryDataDTO} by key. |
||||
*/ |
||||
void deleteJdbcRegistryDataByKey(String key); |
||||
} |
@ -0,0 +1,37 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.server; |
||||
|
||||
public interface IJdbcRegistryLockManager { |
||||
|
||||
/** |
||||
* Acquire the jdbc registry lock by key. this is a blocking method. if you want to stop the blocking, you can use interrupt the thread. |
||||
*/ |
||||
void acquireJdbcRegistryLock(Long clientId, String lockKey) throws InterruptedException; |
||||
|
||||
/** |
||||
* Acquire the jdbc registry lock by key until timeout. |
||||
*/ |
||||
boolean acquireJdbcRegistryLock(Long clientId, String lockKey, long timeout); |
||||
|
||||
/** |
||||
* Release the jdbc registry lock by key, if the lockKey is not exist will do nothing. |
||||
*/ |
||||
void releaseJdbcRegistryLock(Long clientId, String lockKey); |
||||
|
||||
} |
@ -0,0 +1,110 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.server; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.client.IJdbcRegistryClient; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.DataType; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryDataDTO; |
||||
|
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
|
||||
/** |
||||
* The JdbcRegistryServer is represent the server side of the jdbc registry, it can be thought as db server. |
||||
*/ |
||||
public interface IJdbcRegistryServer extends AutoCloseable { |
||||
|
||||
void start(); |
||||
|
||||
/** |
||||
* Register a client to the server, once the client connect to the server then the server will refresh the client's term interval. |
||||
*/ |
||||
void registerClient(IJdbcRegistryClient jdbcRegistryClient); |
||||
|
||||
/** |
||||
* Deregister a client to the server, once the client id deregister, then the server will deleted the data related to the client and stop refresh the client's term. |
||||
*/ |
||||
void deregisterClient(IJdbcRegistryClient jdbcRegistryClient); |
||||
|
||||
/** |
||||
* Get the {@link JdbcRegistryServerState} |
||||
*/ |
||||
JdbcRegistryServerState getServerState(); |
||||
|
||||
/** |
||||
* Subscribe the jdbc registry connection state change |
||||
*/ |
||||
void subscribeConnectionStateChange(ConnectionStateListener connectionStateListener); |
||||
|
||||
/** |
||||
* Subscribe the {@link org.apache.dolphinscheduler.plugin.registry.jdbc.model.DO.JdbcRegistryData} change. |
||||
*/ |
||||
void subscribeJdbcRegistryDataChange(JdbcRegistryDataChangeListener jdbcRegistryDataChangeListener); |
||||
|
||||
/** |
||||
* Check the jdbc registry data key is exist or not. |
||||
*/ |
||||
boolean existJdbcRegistryDataKey(String key); |
||||
|
||||
/** |
||||
* Get the {@link JdbcRegistryDataDTO} by key. |
||||
*/ |
||||
Optional<JdbcRegistryDataDTO> getJdbcRegistryDataByKey(String key); |
||||
|
||||
/** |
||||
* List all the {@link JdbcRegistryDataDTO} children by key. |
||||
* <p> |
||||
* e.g. key = "/dolphinscheduler/master", and data exist in db is "/dolphinscheduler/master/master1", "/dolphinscheduler/master/master2" |
||||
* <p> |
||||
* then the return value will be ["master1", "master2"] |
||||
*/ |
||||
List<JdbcRegistryDataDTO> listJdbcRegistryDataChildren(String key); |
||||
|
||||
/** |
||||
* Put the {@link JdbcRegistryDataDTO} to the jdbc registry server. |
||||
* <p> |
||||
* If the key is already exist, then update the {@link JdbcRegistryDataDTO}. If the key is not exist, then insert a new {@link JdbcRegistryDataDTO}. |
||||
*/ |
||||
void putJdbcRegistryData(Long clientId, String key, String value, DataType dataType); |
||||
|
||||
/** |
||||
* Delete the {@link JdbcRegistryDataDTO} by key. |
||||
*/ |
||||
void deleteJdbcRegistryDataByKey(String key); |
||||
|
||||
/** |
||||
* Acquire the jdbc registry lock by key. this is a blocking method. if you want to stop the blocking, you can use interrupt the thread. |
||||
*/ |
||||
void acquireJdbcRegistryLock(Long clientId, String key); |
||||
|
||||
/** |
||||
* Acquire the jdbc registry lock by key until timeout. |
||||
*/ |
||||
boolean acquireJdbcRegistryLock(Long clientId, String key, long timeout); |
||||
|
||||
/** |
||||
* Release the jdbc registry lock by key, if the lockKey is not exist will do nothing. |
||||
*/ |
||||
void releaseJdbcRegistryLock(Long clientId, String key); |
||||
|
||||
/** |
||||
* Close the server, once the server been closed, it cannot work anymore. |
||||
*/ |
||||
@Override |
||||
void close(); |
||||
} |
@ -0,0 +1,35 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.server; |
||||
|
||||
public interface IRegistryRowChangeNotifier<T> { |
||||
|
||||
void start(); |
||||
|
||||
void subscribeRegistryRowChange(RegistryRowChangeListener<T> registryRowChangeListener); |
||||
|
||||
interface RegistryRowChangeListener<T> { |
||||
|
||||
void onRegistryRowUpdated(T data); |
||||
|
||||
void onRegistryRowAdded(T data); |
||||
|
||||
void onRegistryRowDeleted(T data); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,28 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.server; |
||||
|
||||
public interface JdbcRegistryDataChangeListener { |
||||
|
||||
void onJdbcRegistryDataChanged(String key, String value); |
||||
|
||||
void onJdbcRegistryDataDeleted(String key); |
||||
|
||||
void onJdbcRegistryDataAdded(String key, String value); |
||||
|
||||
} |
@ -0,0 +1,265 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.server; |
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryThreadFactory; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.DataType; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryDataChanceEventDTO; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryDataDTO; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.repository.JdbcRegistryDataChanceEventRepository; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.repository.JdbcRegistryDataRepository; |
||||
|
||||
import org.apache.commons.collections4.CollectionUtils; |
||||
import org.apache.commons.lang3.time.DateUtils; |
||||
|
||||
import java.time.Duration; |
||||
import java.util.Date; |
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
import java.util.concurrent.CopyOnWriteArrayList; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import com.google.common.collect.Lists; |
||||
|
||||
@Slf4j |
||||
public class JdbcRegistryDataManager |
||||
implements |
||||
IRegistryRowChangeNotifier<JdbcRegistryDataDTO>, |
||||
IJdbcRegistryDataManager { |
||||
|
||||
private final Integer keepJdbcRegistryDataChanceEventHours = 2; |
||||
|
||||
private final JdbcRegistryProperties registryProperties; |
||||
|
||||
private final JdbcRegistryDataRepository jdbcRegistryDataRepository; |
||||
|
||||
private final JdbcRegistryDataChanceEventRepository jdbcRegistryDataChanceEventRepository; |
||||
|
||||
private final List<RegistryRowChangeListener<JdbcRegistryDataDTO>> registryRowChangeListeners; |
||||
|
||||
private long lastDetectedJdbcRegistryDataChangeEventId = -1; |
||||
|
||||
public JdbcRegistryDataManager(JdbcRegistryProperties registryProperties, |
||||
JdbcRegistryDataRepository jdbcRegistryDataRepository, |
||||
JdbcRegistryDataChanceEventRepository jdbcRegistryDataChanceEventRepository) { |
||||
this.registryProperties = registryProperties; |
||||
this.jdbcRegistryDataChanceEventRepository = jdbcRegistryDataChanceEventRepository; |
||||
this.jdbcRegistryDataRepository = jdbcRegistryDataRepository; |
||||
this.registryRowChangeListeners = new CopyOnWriteArrayList<>(); |
||||
this.lastDetectedJdbcRegistryDataChangeEventId = |
||||
jdbcRegistryDataChanceEventRepository.getMaxJdbcRegistryDataChanceEventId(); |
||||
} |
||||
|
||||
@Override |
||||
public void start() { |
||||
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay( |
||||
this::detectJdbcRegistryDataChangeEvent, |
||||
registryProperties.getHeartbeatRefreshInterval().toMillis(), |
||||
registryProperties.getHeartbeatRefreshInterval().toMillis(), |
||||
TimeUnit.MILLISECONDS); |
||||
|
||||
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay( |
||||
this::purgeHistoryJdbcRegistryDataChangeEvent, |
||||
0, |
||||
Duration.ofHours(keepJdbcRegistryDataChanceEventHours).toHours(), |
||||
TimeUnit.HOURS); |
||||
} |
||||
|
||||
private void detectJdbcRegistryDataChangeEvent() { |
||||
final List<JdbcRegistryDataChanceEventDTO> jdbcRegistryDataChanceEvents = jdbcRegistryDataChanceEventRepository |
||||
.selectJdbcRegistryDataChangeEventWhereIdAfter(lastDetectedJdbcRegistryDataChangeEventId); |
||||
if (CollectionUtils.isEmpty(jdbcRegistryDataChanceEvents)) { |
||||
return; |
||||
} |
||||
for (JdbcRegistryDataChanceEventDTO jdbcRegistryDataChanceEvent : jdbcRegistryDataChanceEvents) { |
||||
log.debug("Detect JdbcRegistryDataChangeEvent: {}", jdbcRegistryDataChanceEvent); |
||||
switch (jdbcRegistryDataChanceEvent.getEventType()) { |
||||
case ADD: |
||||
doTriggerJdbcRegistryDataAddedListener( |
||||
Lists.newArrayList(jdbcRegistryDataChanceEvent.getJdbcRegistryData())); |
||||
break; |
||||
case UPDATE: |
||||
doTriggerJdbcRegistryDataUpdatedListener( |
||||
Lists.newArrayList(jdbcRegistryDataChanceEvent.getJdbcRegistryData())); |
||||
break; |
||||
case DELETE: |
||||
doTriggerJdbcRegistryDataRemovedListener( |
||||
Lists.newArrayList(jdbcRegistryDataChanceEvent.getJdbcRegistryData())); |
||||
break; |
||||
default: |
||||
log.error("Unknown event type: {}", jdbcRegistryDataChanceEvent.getEventType()); |
||||
break; |
||||
} |
||||
if (jdbcRegistryDataChanceEvent.getId() > lastDetectedJdbcRegistryDataChangeEventId) { |
||||
lastDetectedJdbcRegistryDataChangeEventId = jdbcRegistryDataChanceEvent.getId(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private void purgeHistoryJdbcRegistryDataChangeEvent() { |
||||
log.info("Purge JdbcRegistryDataChanceEvent which createTime is before: {} hours", |
||||
keepJdbcRegistryDataChanceEventHours); |
||||
jdbcRegistryDataChanceEventRepository.deleteJdbcRegistryDataChangeEventBeforeCreateTime( |
||||
DateUtils.addHours(new Date(), -keepJdbcRegistryDataChanceEventHours)); |
||||
} |
||||
|
||||
@Override |
||||
public void subscribeRegistryRowChange(RegistryRowChangeListener<JdbcRegistryDataDTO> registryRowChangeListener) { |
||||
registryRowChangeListeners.add(checkNotNull(registryRowChangeListener)); |
||||
} |
||||
|
||||
@Override |
||||
public boolean existKey(String key) { |
||||
checkNotNull(key); |
||||
return jdbcRegistryDataRepository.selectByKey(key).isPresent(); |
||||
} |
||||
|
||||
@Override |
||||
public Optional<JdbcRegistryDataDTO> getRegistryDataByKey(String key) { |
||||
checkNotNull(key); |
||||
return jdbcRegistryDataRepository.selectByKey(key); |
||||
} |
||||
|
||||
@Override |
||||
public List<JdbcRegistryDataDTO> listJdbcRegistryDataChildren(String key) { |
||||
checkNotNull(key); |
||||
return jdbcRegistryDataRepository.selectAll() |
||||
.stream() |
||||
.filter(jdbcRegistryDataDTO -> jdbcRegistryDataDTO.getDataKey().startsWith(key) |
||||
&& !jdbcRegistryDataDTO.getDataKey().equals(key)) |
||||
.collect(Collectors.toList()); |
||||
} |
||||
|
||||
@Override |
||||
public void putJdbcRegistryData(Long clientId, String key, String value, DataType dataType) { |
||||
checkNotNull(clientId); |
||||
checkNotNull(key); |
||||
checkNotNull(dataType); |
||||
|
||||
Optional<JdbcRegistryDataDTO> jdbcRegistryDataOptional = jdbcRegistryDataRepository.selectByKey(key); |
||||
if (jdbcRegistryDataOptional.isPresent()) { |
||||
JdbcRegistryDataDTO jdbcRegistryData = jdbcRegistryDataOptional.get(); |
||||
if (!dataType.name().equals(jdbcRegistryData.getDataType())) { |
||||
throw new UnsupportedOperationException("The data type: " + jdbcRegistryData.getDataType() |
||||
+ " of the key: " + key + " cannot be updated"); |
||||
} |
||||
|
||||
if (DataType.EPHEMERAL.name().equals(jdbcRegistryData.getDataType())) { |
||||
if (!jdbcRegistryData.getClientId().equals(clientId)) { |
||||
throw new UnsupportedOperationException( |
||||
"The EPHEMERAL data: " + key + " can only be updated by its owner: " |
||||
+ jdbcRegistryData.getClientId() + " but not: " + clientId); |
||||
} |
||||
} |
||||
|
||||
jdbcRegistryData.setDataValue(value); |
||||
jdbcRegistryData.setLastUpdateTime(new Date()); |
||||
jdbcRegistryDataRepository.updateById(jdbcRegistryData); |
||||
|
||||
JdbcRegistryDataChanceEventDTO jdbcRegistryDataChanceEvent = JdbcRegistryDataChanceEventDTO.builder() |
||||
.jdbcRegistryData(jdbcRegistryData) |
||||
.eventType(JdbcRegistryDataChanceEventDTO.EventType.UPDATE) |
||||
.createTime(new Date()) |
||||
.build(); |
||||
jdbcRegistryDataChanceEventRepository.insert(jdbcRegistryDataChanceEvent); |
||||
} else { |
||||
JdbcRegistryDataDTO jdbcRegistryDataDTO = JdbcRegistryDataDTO.builder() |
||||
.clientId(clientId) |
||||
.dataKey(key) |
||||
.dataValue(value) |
||||
.dataType(dataType.name()) |
||||
.createTime(new Date()) |
||||
.lastUpdateTime(new Date()) |
||||
.build(); |
||||
jdbcRegistryDataRepository.insert(jdbcRegistryDataDTO); |
||||
JdbcRegistryDataChanceEventDTO registryDataChanceEvent = JdbcRegistryDataChanceEventDTO.builder() |
||||
.jdbcRegistryData(jdbcRegistryDataDTO) |
||||
.eventType(JdbcRegistryDataChanceEventDTO.EventType.ADD) |
||||
.createTime(new Date()) |
||||
.build(); |
||||
jdbcRegistryDataChanceEventRepository.insert(registryDataChanceEvent); |
||||
} |
||||
|
||||
} |
||||
|
||||
@Override |
||||
public void deleteJdbcRegistryDataByKey(String key) { |
||||
checkNotNull(key); |
||||
// todo: this is not atomic, need to be improved
|
||||
Optional<JdbcRegistryDataDTO> jdbcRegistryDataOptional = jdbcRegistryDataRepository.selectByKey(key); |
||||
if (!jdbcRegistryDataOptional.isPresent()) { |
||||
return; |
||||
} |
||||
jdbcRegistryDataRepository.deleteByKey(key); |
||||
JdbcRegistryDataChanceEventDTO registryDataChanceEvent = JdbcRegistryDataChanceEventDTO.builder() |
||||
.jdbcRegistryData(jdbcRegistryDataOptional.get()) |
||||
.eventType(JdbcRegistryDataChanceEventDTO.EventType.DELETE) |
||||
.createTime(new Date()) |
||||
.build(); |
||||
jdbcRegistryDataChanceEventRepository.insert(registryDataChanceEvent); |
||||
} |
||||
|
||||
private void doTriggerJdbcRegistryDataAddedListener(List<JdbcRegistryDataDTO> valuesToAdd) { |
||||
if (CollectionUtils.isEmpty(valuesToAdd)) { |
||||
return; |
||||
} |
||||
log.debug("Trigger:onJdbcRegistryDataAdded: {}", valuesToAdd); |
||||
valuesToAdd.forEach(jdbcRegistryData -> { |
||||
try { |
||||
registryRowChangeListeners.forEach(listener -> listener.onRegistryRowAdded(jdbcRegistryData)); |
||||
} catch (Exception ex) { |
||||
log.error("Trigger:onRegistryRowAdded: {} failed", jdbcRegistryData, ex); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
private void doTriggerJdbcRegistryDataRemovedListener(List<JdbcRegistryDataDTO> valuesToRemoved) { |
||||
if (CollectionUtils.isEmpty(valuesToRemoved)) { |
||||
return; |
||||
} |
||||
log.debug("Trigger:onJdbcRegistryDataDeleted: {}", valuesToRemoved); |
||||
valuesToRemoved.forEach(jdbcRegistryData -> { |
||||
try { |
||||
registryRowChangeListeners.forEach(listener -> listener.onRegistryRowDeleted(jdbcRegistryData)); |
||||
} catch (Exception ex) { |
||||
log.error("Trigger:onRegistryRowAdded: {} failed", jdbcRegistryData, ex); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
private void doTriggerJdbcRegistryDataUpdatedListener(List<JdbcRegistryDataDTO> valuesToUpdated) { |
||||
if (CollectionUtils.isEmpty(valuesToUpdated)) { |
||||
return; |
||||
} |
||||
log.debug("Trigger:onJdbcRegistryDataUpdated: {}", valuesToUpdated); |
||||
valuesToUpdated.forEach(jdbcRegistryData -> { |
||||
try { |
||||
registryRowChangeListeners.forEach(listener -> listener.onRegistryRowUpdated(jdbcRegistryData)); |
||||
} catch (Exception ex) { |
||||
log.error("Trigger:onRegistryRowAdded: {} failed", jdbcRegistryData, ex); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,149 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.server; |
||||
|
||||
import org.apache.dolphinscheduler.common.thread.ThreadUtils; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.LockUtils; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryLockDTO; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.repository.JdbcRegistryLockRepository; |
||||
|
||||
import java.util.Date; |
||||
import java.util.HashMap; |
||||
import java.util.Map; |
||||
|
||||
import lombok.AllArgsConstructor; |
||||
import lombok.Builder; |
||||
import lombok.Data; |
||||
import lombok.NoArgsConstructor; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import org.springframework.dao.DuplicateKeyException; |
||||
|
||||
@Slf4j |
||||
public class JdbcRegistryLockManager implements IJdbcRegistryLockManager { |
||||
|
||||
private final JdbcRegistryProperties jdbcRegistryProperties; |
||||
private final JdbcRegistryLockRepository jdbcRegistryLockRepository; |
||||
|
||||
// lockKey -> LockEntry
|
||||
private final Map<String, LockEntry> jdbcRegistryLockHolderMap = new HashMap<>(); |
||||
|
||||
public JdbcRegistryLockManager(JdbcRegistryProperties jdbcRegistryProperties, |
||||
JdbcRegistryLockRepository jdbcRegistryLockRepository) { |
||||
this.jdbcRegistryProperties = jdbcRegistryProperties; |
||||
this.jdbcRegistryLockRepository = jdbcRegistryLockRepository; |
||||
} |
||||
|
||||
@Override |
||||
public void acquireJdbcRegistryLock(Long clientId, String lockKey) { |
||||
String lockOwner = LockUtils.getLockOwner(); |
||||
while (true) { |
||||
LockEntry lockEntry = jdbcRegistryLockHolderMap.get(lockKey); |
||||
if (lockEntry != null && lockOwner.equals(lockEntry.getLockOwner())) { |
||||
return; |
||||
} |
||||
JdbcRegistryLockDTO jdbcRegistryLock = JdbcRegistryLockDTO.builder() |
||||
.lockKey(lockKey) |
||||
.clientId(clientId) |
||||
.lockOwner(lockOwner) |
||||
.createTime(new Date()) |
||||
.build(); |
||||
try { |
||||
jdbcRegistryLockRepository.insert(jdbcRegistryLock); |
||||
if (jdbcRegistryLock != null) { |
||||
jdbcRegistryLockHolderMap.put(lockKey, LockEntry.builder() |
||||
.lockKey(lockKey) |
||||
.lockOwner(lockOwner) |
||||
.jdbcRegistryLock(jdbcRegistryLock) |
||||
.build()); |
||||
return; |
||||
} |
||||
log.debug("{} acquire the lock {} success", lockOwner, lockKey); |
||||
} catch (DuplicateKeyException duplicateKeyException) { |
||||
// The lock is already exist, wait it release.
|
||||
continue; |
||||
} |
||||
log.debug("Acquire the lock {} failed try again", lockKey); |
||||
// acquire failed, wait and try again
|
||||
ThreadUtils.sleep(jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis()); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public boolean acquireJdbcRegistryLock(Long clientId, String lockKey, long timeout) { |
||||
String lockOwner = LockUtils.getLockOwner(); |
||||
long start = System.currentTimeMillis(); |
||||
while (System.currentTimeMillis() - start <= timeout) { |
||||
LockEntry lockEntry = jdbcRegistryLockHolderMap.get(lockKey); |
||||
if (lockEntry != null && lockOwner.equals(lockEntry.getLockOwner())) { |
||||
return true; |
||||
} |
||||
JdbcRegistryLockDTO jdbcRegistryLock = JdbcRegistryLockDTO.builder() |
||||
.lockKey(lockKey) |
||||
.clientId(clientId) |
||||
.lockOwner(lockOwner) |
||||
.createTime(new Date()) |
||||
.build(); |
||||
try { |
||||
jdbcRegistryLockRepository.insert(jdbcRegistryLock); |
||||
if (jdbcRegistryLock != null) { |
||||
jdbcRegistryLockHolderMap.put(lockKey, LockEntry.builder() |
||||
.lockKey(lockKey) |
||||
.lockOwner(lockOwner) |
||||
.jdbcRegistryLock(jdbcRegistryLock) |
||||
.build()); |
||||
return true; |
||||
} |
||||
log.debug("{} acquire the lock {} success", lockOwner, lockKey); |
||||
} catch (DuplicateKeyException duplicateKeyException) { |
||||
// The lock is already exist, wait it release.
|
||||
continue; |
||||
} |
||||
log.debug("Acquire the lock {} failed try again", lockKey); |
||||
// acquire failed, wait and try again
|
||||
ThreadUtils.sleep(jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis()); |
||||
} |
||||
return false; |
||||
} |
||||
|
||||
@Override |
||||
public void releaseJdbcRegistryLock(Long clientId, String lockKey) { |
||||
LockEntry lockEntry = jdbcRegistryLockHolderMap.get(lockKey); |
||||
if (lockEntry == null) { |
||||
return; |
||||
} |
||||
if (!clientId.equals(lockEntry.getJdbcRegistryLock().getClientId())) { |
||||
throw new UnsupportedOperationException( |
||||
"The client " + clientId + " is not the lock owner of the lock: " + lockKey); |
||||
} |
||||
jdbcRegistryLockRepository.deleteById(lockEntry.getJdbcRegistryLock().getId()); |
||||
jdbcRegistryLockHolderMap.remove(lockKey); |
||||
} |
||||
|
||||
@Data |
||||
@Builder |
||||
@NoArgsConstructor |
||||
@AllArgsConstructor |
||||
public static class LockEntry { |
||||
|
||||
private String lockKey; |
||||
private String lockOwner; |
||||
private JdbcRegistryLockDTO jdbcRegistryLock; |
||||
} |
||||
} |
@ -0,0 +1,387 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.server; |
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryThreadFactory; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.client.IJdbcRegistryClient; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.client.JdbcRegistryClientIdentify; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.DataType; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryClientHeartbeatDTO; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryDataDTO; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.repository.JdbcRegistryClientRepository; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.repository.JdbcRegistryDataChanceEventRepository; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.repository.JdbcRegistryDataRepository; |
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.repository.JdbcRegistryLockRepository; |
||||
import org.apache.dolphinscheduler.registry.api.RegistryException; |
||||
|
||||
import org.apache.commons.collections4.CollectionUtils; |
||||
|
||||
import java.util.Date; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Optional; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.concurrent.CopyOnWriteArrayList; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import lombok.SneakyThrows; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import com.google.common.collect.Lists; |
||||
|
||||
/** |
||||
* The JdbcRegistryServer will manage the client, once a client is disconnected, the server will remove the client from the registry, and remove it's related data and lock. |
||||
*/ |
||||
@Slf4j |
||||
public class JdbcRegistryServer implements IJdbcRegistryServer { |
||||
|
||||
private final JdbcRegistryProperties jdbcRegistryProperties; |
||||
|
||||
private final JdbcRegistryDataRepository jdbcRegistryDataRepository; |
||||
|
||||
private final JdbcRegistryLockRepository jdbcRegistryLockRepository; |
||||
|
||||
private final JdbcRegistryClientRepository jdbcRegistryClientRepository; |
||||
|
||||
private final JdbcRegistryDataManager jdbcRegistryDataManager; |
||||
|
||||
private final JdbcRegistryLockManager jdbcRegistryLockManager; |
||||
|
||||
private JdbcRegistryServerState jdbcRegistryServerState; |
||||
|
||||
private final List<IJdbcRegistryClient> jdbcRegistryClients = new CopyOnWriteArrayList<>(); |
||||
|
||||
private final List<ConnectionStateListener> connectionStateListeners = new CopyOnWriteArrayList<>(); |
||||
|
||||
private final Map<JdbcRegistryClientIdentify, JdbcRegistryClientHeartbeatDTO> jdbcRegistryClientDTOMap = |
||||
new ConcurrentHashMap<>(); |
||||
|
||||
private Long lastSuccessHeartbeat; |
||||
|
||||
public JdbcRegistryServer(JdbcRegistryDataRepository jdbcRegistryDataRepository, |
||||
JdbcRegistryLockRepository jdbcRegistryLockRepository, |
||||
JdbcRegistryClientRepository jdbcRegistryClientRepository, |
||||
JdbcRegistryDataChanceEventRepository jdbcRegistryDataChanceEventRepository, |
||||
JdbcRegistryProperties jdbcRegistryProperties) { |
||||
this.jdbcRegistryDataRepository = checkNotNull(jdbcRegistryDataRepository); |
||||
this.jdbcRegistryLockRepository = checkNotNull(jdbcRegistryLockRepository); |
||||
this.jdbcRegistryClientRepository = checkNotNull(jdbcRegistryClientRepository); |
||||
this.jdbcRegistryProperties = checkNotNull(jdbcRegistryProperties); |
||||
this.jdbcRegistryDataManager = new JdbcRegistryDataManager( |
||||
jdbcRegistryProperties, jdbcRegistryDataRepository, jdbcRegistryDataChanceEventRepository); |
||||
this.jdbcRegistryLockManager = new JdbcRegistryLockManager( |
||||
jdbcRegistryProperties, jdbcRegistryLockRepository); |
||||
this.jdbcRegistryServerState = JdbcRegistryServerState.INIT; |
||||
lastSuccessHeartbeat = System.currentTimeMillis(); |
||||
} |
||||
|
||||
@Override |
||||
public void start() { |
||||
if (jdbcRegistryServerState != JdbcRegistryServerState.INIT) { |
||||
// The server is already started or stopped, will not start again.
|
||||
return; |
||||
} |
||||
// Purge the previous client to avoid the client is still in the registry.
|
||||
purgePreviousJdbcRegistryClient(); |
||||
// Start the Purge thread
|
||||
// The Purge thread will remove the client from the registry, and remove it's related data and lock.
|
||||
// Connect to the database, load the data and lock.
|
||||
purgeDeadJdbcRegistryClient(); |
||||
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor() |
||||
.scheduleWithFixedDelay(this::purgeDeadJdbcRegistryClient, |
||||
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(), |
||||
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(), |
||||
TimeUnit.MILLISECONDS); |
||||
jdbcRegistryDataManager.start(); |
||||
jdbcRegistryServerState = JdbcRegistryServerState.STARTED; |
||||
doTriggerOnConnectedListener(); |
||||
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor() |
||||
.scheduleWithFixedDelay(this::refreshClientsHeartbeat, |
||||
0, |
||||
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(), |
||||
TimeUnit.MILLISECONDS); |
||||
} |
||||
|
||||
@SneakyThrows |
||||
@Override |
||||
public void registerClient(IJdbcRegistryClient jdbcRegistryClient) { |
||||
checkNotNull(jdbcRegistryClient); |
||||
|
||||
JdbcRegistryClientIdentify jdbcRegistryClientIdentify = jdbcRegistryClient.getJdbcRegistryClientIdentify(); |
||||
checkNotNull(jdbcRegistryClientIdentify); |
||||
|
||||
JdbcRegistryClientHeartbeatDTO registryClientDTO = JdbcRegistryClientHeartbeatDTO.builder() |
||||
.id(jdbcRegistryClientIdentify.getClientId()) |
||||
.clientName(jdbcRegistryClientIdentify.getClientName()) |
||||
.clientConfig( |
||||
new JdbcRegistryClientHeartbeatDTO.ClientConfig( |
||||
jdbcRegistryProperties.getSessionTimeout().toMillis())) |
||||
.createTime(new Date()) |
||||
.lastHeartbeatTime(System.currentTimeMillis()) |
||||
.build(); |
||||
|
||||
while (jdbcRegistryClientDTOMap.containsKey(jdbcRegistryClientIdentify)) { |
||||
log.warn("The client {} is already exist the registry.", jdbcRegistryClientIdentify.getClientId()); |
||||
Thread.sleep(jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis()); |
||||
} |
||||
jdbcRegistryClientRepository.insert(registryClientDTO); |
||||
jdbcRegistryClients.add(jdbcRegistryClient); |
||||
jdbcRegistryClientDTOMap.put(jdbcRegistryClientIdentify, registryClientDTO); |
||||
} |
||||
|
||||
@Override |
||||
public void deregisterClient(IJdbcRegistryClient jdbcRegistryClient) { |
||||
checkNotNull(jdbcRegistryClient); |
||||
jdbcRegistryClients.remove(jdbcRegistryClient); |
||||
jdbcRegistryClientDTOMap.remove(jdbcRegistryClient.getJdbcRegistryClientIdentify()); |
||||
|
||||
JdbcRegistryClientIdentify jdbcRegistryClientIdentify = jdbcRegistryClient.getJdbcRegistryClientIdentify(); |
||||
checkNotNull(jdbcRegistryClientIdentify); |
||||
|
||||
doPurgeJdbcRegistryClientInDB(Lists.newArrayList(jdbcRegistryClientIdentify.getClientId())); |
||||
} |
||||
|
||||
@Override |
||||
public JdbcRegistryServerState getServerState() { |
||||
return jdbcRegistryServerState; |
||||
} |
||||
|
||||
@Override |
||||
public void subscribeConnectionStateChange(ConnectionStateListener connectionStateListener) { |
||||
checkNotNull(connectionStateListener); |
||||
connectionStateListeners.add(connectionStateListener); |
||||
} |
||||
|
||||
@Override |
||||
public void subscribeJdbcRegistryDataChange(JdbcRegistryDataChangeListener jdbcRegistryDataChangeListener) { |
||||
checkNotNull(jdbcRegistryDataChangeListener); |
||||
jdbcRegistryDataManager.subscribeRegistryRowChange( |
||||
new IRegistryRowChangeNotifier.RegistryRowChangeListener<JdbcRegistryDataDTO>() { |
||||
|
||||
@Override |
||||
public void onRegistryRowUpdated(JdbcRegistryDataDTO data) { |
||||
jdbcRegistryDataChangeListener.onJdbcRegistryDataChanged(data.getDataKey(), |
||||
data.getDataValue()); |
||||
} |
||||
|
||||
@Override |
||||
public void onRegistryRowAdded(JdbcRegistryDataDTO data) { |
||||
jdbcRegistryDataChangeListener.onJdbcRegistryDataAdded(data.getDataKey(), data.getDataValue()); |
||||
} |
||||
|
||||
@Override |
||||
public void onRegistryRowDeleted(JdbcRegistryDataDTO data) { |
||||
jdbcRegistryDataChangeListener.onJdbcRegistryDataDeleted(data.getDataKey()); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
@Override |
||||
public boolean existJdbcRegistryDataKey(String key) { |
||||
return jdbcRegistryDataManager.existKey(key); |
||||
} |
||||
|
||||
@Override |
||||
public Optional<JdbcRegistryDataDTO> getJdbcRegistryDataByKey(String key) { |
||||
return jdbcRegistryDataManager.getRegistryDataByKey(key); |
||||
} |
||||
|
||||
@Override |
||||
public List<JdbcRegistryDataDTO> listJdbcRegistryDataChildren(String key) { |
||||
return jdbcRegistryDataManager.listJdbcRegistryDataChildren(key); |
||||
} |
||||
|
||||
@Override |
||||
public void putJdbcRegistryData(Long clientId, String key, String value, DataType dataType) { |
||||
jdbcRegistryDataManager.putJdbcRegistryData(clientId, key, value, dataType); |
||||
} |
||||
|
||||
@Override |
||||
public void deleteJdbcRegistryDataByKey(String key) { |
||||
jdbcRegistryDataManager.deleteJdbcRegistryDataByKey(key); |
||||
} |
||||
|
||||
@Override |
||||
public void acquireJdbcRegistryLock(Long clientId, String lockKey) { |
||||
try { |
||||
jdbcRegistryLockManager.acquireJdbcRegistryLock(clientId, lockKey); |
||||
} catch (Exception ex) { |
||||
throw new RegistryException("Acquire the lock: " + lockKey + " error", ex); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public boolean acquireJdbcRegistryLock(Long clientId, String lockKey, long timeout) { |
||||
try { |
||||
return jdbcRegistryLockManager.acquireJdbcRegistryLock(clientId, lockKey, timeout); |
||||
} catch (Exception ex) { |
||||
throw new RegistryException("Acquire the lock: " + lockKey + " error", ex); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void releaseJdbcRegistryLock(Long clientId, String lockKey) { |
||||
try { |
||||
jdbcRegistryLockManager.releaseJdbcRegistryLock(clientId, lockKey); |
||||
} catch (Exception ex) { |
||||
throw new RegistryException("Release the lock: " + lockKey + " error", ex); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void close() { |
||||
jdbcRegistryServerState = JdbcRegistryServerState.STOPPED; |
||||
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdown(); |
||||
List<Long> clientIds = jdbcRegistryClients.stream() |
||||
.map(IJdbcRegistryClient::getJdbcRegistryClientIdentify) |
||||
.map(JdbcRegistryClientIdentify::getClientId) |
||||
.collect(Collectors.toList()); |
||||
doPurgeJdbcRegistryClientInDB(clientIds); |
||||
jdbcRegistryClients.clear(); |
||||
jdbcRegistryClientDTOMap.clear(); |
||||
} |
||||
|
||||
private void purgePreviousJdbcRegistryClient() { |
||||
if (jdbcRegistryServerState == JdbcRegistryServerState.STOPPED) { |
||||
return; |
||||
} |
||||
List<Long> previousJdbcRegistryClientIds = jdbcRegistryClientRepository.queryAll() |
||||
.stream() |
||||
.filter(jdbcRegistryClientHeartbeat -> jdbcRegistryClientHeartbeat.getClientName() |
||||
.equals(jdbcRegistryProperties.getJdbcRegistryClientName())) |
||||
.map(JdbcRegistryClientHeartbeatDTO::getId) |
||||
.collect(Collectors.toList()); |
||||
doPurgeJdbcRegistryClientInDB(previousJdbcRegistryClientIds); |
||||
|
||||
} |
||||
|
||||
private void purgeDeadJdbcRegistryClient() { |
||||
if (jdbcRegistryServerState == JdbcRegistryServerState.STOPPED) { |
||||
return; |
||||
} |
||||
List<Long> deadJdbcRegistryClientIds = jdbcRegistryClientRepository.queryAll() |
||||
.stream() |
||||
.filter(JdbcRegistryClientHeartbeatDTO::isDead) |
||||
.map(JdbcRegistryClientHeartbeatDTO::getId) |
||||
.collect(Collectors.toList()); |
||||
doPurgeJdbcRegistryClientInDB(deadJdbcRegistryClientIds); |
||||
|
||||
} |
||||
|
||||
private void doPurgeJdbcRegistryClientInDB(List<Long> jdbcRegistryClientIds) { |
||||
if (CollectionUtils.isEmpty(jdbcRegistryClientIds)) { |
||||
return; |
||||
} |
||||
log.info("Begin to delete dead jdbcRegistryClient: {}", jdbcRegistryClientIds); |
||||
jdbcRegistryDataRepository.deleteEphemeralDateByClientIds(jdbcRegistryClientIds); |
||||
jdbcRegistryLockRepository.deleteByClientIds(jdbcRegistryClientIds); |
||||
jdbcRegistryClientRepository.deleteByIds(jdbcRegistryClientIds); |
||||
log.info("Success delete dead jdbcRegistryClient: {}", jdbcRegistryClientIds); |
||||
} |
||||
|
||||
private void refreshClientsHeartbeat() { |
||||
if (CollectionUtils.isEmpty(jdbcRegistryClients)) { |
||||
return; |
||||
} |
||||
if (jdbcRegistryServerState == JdbcRegistryServerState.STOPPED) { |
||||
log.warn("The JdbcRegistryServer is STOPPED, will not refresh clients: {} heartbeat.", |
||||
CollectionUtils.collect(jdbcRegistryClients, IJdbcRegistryClient::getJdbcRegistryClientIdentify)); |
||||
return; |
||||
} |
||||
// Refresh the client's term
|
||||
try { |
||||
long now = System.currentTimeMillis(); |
||||
for (IJdbcRegistryClient jdbcRegistryClient : jdbcRegistryClients) { |
||||
JdbcRegistryClientHeartbeatDTO jdbcRegistryClientHeartbeatDTO = |
||||
jdbcRegistryClientDTOMap.get(jdbcRegistryClient.getJdbcRegistryClientIdentify()); |
||||
if (jdbcRegistryClientHeartbeatDTO == null) { |
||||
// This may occur when the data in db has been deleted, but the client is still alive.
|
||||
log.error( |
||||
"The client {} is not found in the registry, will not refresh it's term. (This may happen when the client is removed from the db)", |
||||
jdbcRegistryClient.getJdbcRegistryClientIdentify().getClientId()); |
||||
continue; |
||||
} |
||||
JdbcRegistryClientHeartbeatDTO clone = jdbcRegistryClientHeartbeatDTO.clone(); |
||||
clone.setLastHeartbeatTime(now); |
||||
jdbcRegistryClientRepository.updateById(jdbcRegistryClientHeartbeatDTO); |
||||
jdbcRegistryClientHeartbeatDTO.setLastHeartbeatTime(clone.getLastHeartbeatTime()); |
||||
} |
||||
if (jdbcRegistryServerState == JdbcRegistryServerState.SUSPENDED) { |
||||
jdbcRegistryServerState = JdbcRegistryServerState.STARTED; |
||||
doTriggerReconnectedListener(); |
||||
} |
||||
lastSuccessHeartbeat = now; |
||||
log.debug("Success refresh clients: {} heartbeat.", |
||||
CollectionUtils.collect(jdbcRegistryClients, IJdbcRegistryClient::getJdbcRegistryClientIdentify)); |
||||
} catch (Exception ex) { |
||||
log.error("Failed to refresh the client's term", ex); |
||||
switch (jdbcRegistryServerState) { |
||||
case STARTED: |
||||
jdbcRegistryServerState = JdbcRegistryServerState.SUSPENDED; |
||||
break; |
||||
case SUSPENDED: |
||||
if (System.currentTimeMillis() - lastSuccessHeartbeat > jdbcRegistryProperties.getSessionTimeout() |
||||
.toMillis()) { |
||||
jdbcRegistryServerState = JdbcRegistryServerState.DISCONNECTED; |
||||
doTriggerOnDisConnectedListener(); |
||||
} |
||||
break; |
||||
default: |
||||
break; |
||||
} |
||||
} |
||||
} |
||||
|
||||
private void doTriggerReconnectedListener() { |
||||
log.info("Trigger:onReconnected listener."); |
||||
connectionStateListeners.forEach(listener -> { |
||||
try { |
||||
listener.onReconnected(); |
||||
} catch (Exception ex) { |
||||
log.error("Trigger:onReconnected failed", ex); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
private void doTriggerOnConnectedListener() { |
||||
log.info("Trigger:onConnected listener."); |
||||
connectionStateListeners.forEach(listener -> { |
||||
try { |
||||
listener.onConnected(); |
||||
} catch (Exception ex) { |
||||
log.error("Trigger:onConnected failed", ex); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
private void doTriggerOnDisConnectedListener() { |
||||
log.info("Trigger:onDisConnected listener."); |
||||
connectionStateListeners.forEach(listener -> { |
||||
try { |
||||
listener.onDisConnected(); |
||||
} catch (Exception ex) { |
||||
log.error("Trigger:onDisConnected failed", ex); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,46 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc.server; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties; |
||||
|
||||
public enum JdbcRegistryServerState { |
||||
/** |
||||
* Once the {@link JdbcRegistryServer} created, it will be in the INIT state. |
||||
*/ |
||||
INIT, |
||||
/** |
||||
* After the {@link JdbcRegistryServer} started, it will be in the STARTED state. |
||||
*/ |
||||
STARTED, |
||||
/** |
||||
* Once the {@link JdbcRegistryServer} cannot connect to DB, it will be in the SUSPENDED state, and if it can reconnect to DB in {@link JdbcRegistryProperties#getSessionTimeout()} ()}, |
||||
* it will be changed to the STARTED state again. |
||||
*/ |
||||
SUSPENDED, |
||||
/** |
||||
* If the {@link JdbcRegistryServer} cannot connected to DB in {@link JdbcRegistryProperties#getSessionTimeout()}, it will be in the DISCONNECTED state. |
||||
*/ |
||||
DISCONNECTED, |
||||
/** |
||||
* If the {@link JdbcRegistryServer} closed, it will be in the STOPPED state. |
||||
*/ |
||||
STOPPED, |
||||
; |
||||
|
||||
} |
@ -0,0 +1,43 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc; |
||||
|
||||
import static com.google.common.truth.Truth.assertThat; |
||||
import static org.awaitility.Awaitility.await; |
||||
|
||||
import java.util.concurrent.ScheduledExecutorService; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.concurrent.atomic.AtomicInteger; |
||||
|
||||
import org.junit.jupiter.api.Test; |
||||
|
||||
class JdbcRegistryThreadFactoryTest { |
||||
|
||||
@Test |
||||
void getDefaultSchedulerThreadExecutor() { |
||||
ScheduledExecutorService schedulerThreadExecutor = |
||||
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor(); |
||||
AtomicInteger atomicInteger = new AtomicInteger(0); |
||||
for (int i = 0; i < 100; i++) { |
||||
schedulerThreadExecutor.scheduleWithFixedDelay(atomicInteger::incrementAndGet, 0, 1, TimeUnit.SECONDS); |
||||
} |
||||
await() |
||||
.atMost(10, TimeUnit.SECONDS) |
||||
.untilAsserted(() -> assertThat(atomicInteger.get()).isEqualTo(100)); |
||||
} |
||||
} |
@ -0,0 +1,33 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.plugin.registry.jdbc; |
||||
|
||||
import static com.google.common.truth.Truth.assertThat; |
||||
import static com.google.common.truth.Truth.assertWithMessage; |
||||
|
||||
import org.junit.jupiter.api.Test; |
||||
|
||||
class LockUtilsTest { |
||||
|
||||
@Test |
||||
void getLockOwner() { |
||||
assertThat(LockUtils.getLockOwner()).isNotNull(); |
||||
assertWithMessage("Lock owner should not change") |
||||
.that(LockUtils.getLockOwner()) |
||||
.isEqualTo(LockUtils.getLockOwner()); |
||||
} |
||||
} |
@ -1,31 +0,0 @@
|
||||
#!/bin/bash |
||||
# |
||||
# Licensed to the Apache Software Foundation (ASF) under one or more |
||||
# contributor license agreements. See the NOTICE file distributed with |
||||
# this work for additional information regarding copyright ownership. |
||||
# The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
# (the "License"); you may not use this file except in compliance with |
||||
# the License. You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
# |
||||
|
||||
BIN_DIR=$(dirname $0) |
||||
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/../..; pwd)} |
||||
|
||||
if [ "$DOCKER" != "true" ]; then |
||||
source "$DOLPHINSCHEDULER_HOME/bin/env/dolphinscheduler_env.sh" |
||||
fi |
||||
|
||||
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms1g -Xmx1g -Xmn512m -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"} |
||||
|
||||
$JAVA_HOME/bin/java $JAVA_OPTS \ |
||||
-cp "$DOLPHINSCHEDULER_HOME/tools/conf":"$DOLPHINSCHEDULER_HOME/tools/libs/*":"$DOLPHINSCHEDULER_HOME/tools/sql" \ |
||||
-Dspring.profiles.active=${DATABASE} \ |
||||
org.apache.dolphinscheduler.tools.command.CommandApplication |
@ -1,152 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.tools.command; |
||||
|
||||
import org.apache.dolphinscheduler.dao.DaoConfiguration; |
||||
import org.apache.dolphinscheduler.dao.plugin.api.dialect.DatabaseDialect; |
||||
|
||||
import java.sql.Connection; |
||||
import java.sql.SQLException; |
||||
import java.sql.Statement; |
||||
|
||||
import javax.sql.DataSource; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.boot.CommandLineRunner; |
||||
import org.springframework.boot.SpringApplication; |
||||
import org.springframework.boot.autoconfigure.ImportAutoConfiguration; |
||||
import org.springframework.boot.autoconfigure.SpringBootApplication; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
import com.baomidou.mybatisplus.annotation.DbType; |
||||
|
||||
// todo: use spring-shell to manage the command
|
||||
@SpringBootApplication |
||||
@ImportAutoConfiguration(DaoConfiguration.class) |
||||
public class CommandApplication { |
||||
|
||||
public static void main(String[] args) { |
||||
SpringApplication.run(CommandApplication.class, args); |
||||
} |
||||
|
||||
@Component |
||||
@Slf4j |
||||
static class JdbcRegistrySchemaInitializeCommand implements CommandLineRunner { |
||||
|
||||
@Autowired |
||||
private DatabaseDialect databaseDialect; |
||||
|
||||
@Autowired |
||||
private DbType dbType; |
||||
|
||||
@Autowired |
||||
private DataSource dataSource; |
||||
|
||||
JdbcRegistrySchemaInitializeCommand() { |
||||
} |
||||
|
||||
@Override |
||||
public void run(String... args) throws Exception { |
||||
if (databaseDialect.tableExists("t_ds_jdbc_registry_data") |
||||
|| databaseDialect.tableExists("t_ds_jdbc_registry_lock")) { |
||||
log.warn("t_ds_jdbc_registry_data/t_ds_jdbc_registry_lock already exists"); |
||||
return; |
||||
} |
||||
if (dbType == DbType.MYSQL) { |
||||
jdbcRegistrySchemaInitializeInMysql(); |
||||
} else if (dbType == DbType.POSTGRE_SQL) { |
||||
jdbcRegistrySchemaInitializeInPG(); |
||||
} else { |
||||
log.error("Unsupported database type: {}", dbType); |
||||
} |
||||
} |
||||
|
||||
private void jdbcRegistrySchemaInitializeInMysql() throws SQLException { |
||||
try ( |
||||
Connection connection = dataSource.getConnection(); |
||||
Statement statement = connection.createStatement()) { |
||||
statement.execute("CREATE TABLE `t_ds_jdbc_registry_data`\n" + |
||||
"(\n" + |
||||
" `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',\n" + |
||||
" `data_key` varchar(256) NOT NULL COMMENT 'key, like zookeeper node path',\n" + |
||||
" `data_value` text NOT NULL COMMENT 'data, like zookeeper node value',\n" + |
||||
" `data_type` tinyint(4) NOT NULL COMMENT '1: ephemeral node, 2: persistent node',\n" |
||||
+ |
||||
" `last_term` bigint NOT NULL COMMENT 'last term time',\n" + |
||||
" `last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'last update time',\n" |
||||
+ |
||||
" `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',\n" |
||||
+ |
||||
" PRIMARY KEY (`id`),\n" + |
||||
" unique (`data_key`)\n" + |
||||
") ENGINE = InnoDB\n" + |
||||
" DEFAULT CHARSET = utf8;"); |
||||
|
||||
statement.execute("CREATE TABLE `t_ds_jdbc_registry_lock`\n" + |
||||
"(\n" + |
||||
" `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',\n" + |
||||
" `lock_key` varchar(256) NOT NULL COMMENT 'lock path',\n" + |
||||
" `lock_owner` varchar(256) NOT NULL COMMENT 'the lock owner, ip_processId',\n" + |
||||
" `last_term` bigint NOT NULL COMMENT 'last term time',\n" + |
||||
" `last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'last update time',\n" |
||||
+ |
||||
" `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',\n" |
||||
+ |
||||
" PRIMARY KEY (`id`),\n" + |
||||
" unique (`lock_key`)\n" + |
||||
") ENGINE = InnoDB\n" + |
||||
" DEFAULT CHARSET = utf8;"); |
||||
} |
||||
} |
||||
|
||||
private void jdbcRegistrySchemaInitializeInPG() throws SQLException { |
||||
try ( |
||||
Connection connection = dataSource.getConnection(); |
||||
Statement statement = connection.createStatement()) { |
||||
statement.execute("create table t_ds_jdbc_registry_data\n" + |
||||
"(\n" + |
||||
" id serial\n" + |
||||
" constraint t_ds_jdbc_registry_data_pk primary key,\n" + |
||||
" data_key varchar not null,\n" + |
||||
" data_value text not null,\n" + |
||||
" data_type int4 not null,\n" + |
||||
" last_term bigint not null,\n" + |
||||
" last_update_time timestamp default current_timestamp not null,\n" + |
||||
" create_time timestamp default current_timestamp not null\n" + |
||||
");"); |
||||
statement.execute( |
||||
"create unique index t_ds_jdbc_registry_data_key_uindex on t_ds_jdbc_registry_data (data_key);"); |
||||
statement.execute("create table t_ds_jdbc_registry_lock\n" + |
||||
"(\n" + |
||||
" id serial\n" + |
||||
" constraint t_ds_jdbc_registry_lock_pk primary key,\n" + |
||||
" lock_key varchar not null,\n" + |
||||
" lock_owner varchar not null,\n" + |
||||
" last_term bigint not null,\n" + |
||||
" last_update_time timestamp default current_timestamp not null,\n" + |
||||
" create_time timestamp default current_timestamp not null\n" + |
||||
");"); |
||||
statement.execute( |
||||
"create unique index t_ds_jdbc_registry_lock_key_uindex on t_ds_jdbc_registry_lock (lock_key);"); |
||||
} |
||||
} |
||||
|
||||
} |
||||
} |
Loading…
Reference in new issue