|
|
@ -17,15 +17,20 @@ |
|
|
|
|
|
|
|
|
|
|
|
"""Module java gateway, contain gateway behavior.""" |
|
|
|
"""Module java gateway, contain gateway behavior.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import contextlib |
|
|
|
|
|
|
|
from logging import getLogger |
|
|
|
from typing import Any, Optional |
|
|
|
from typing import Any, Optional |
|
|
|
|
|
|
|
|
|
|
|
from py4j.java_collections import JavaMap |
|
|
|
from py4j.java_collections import JavaMap |
|
|
|
from py4j.java_gateway import GatewayParameters, JavaGateway |
|
|
|
from py4j.java_gateway import GatewayParameters, JavaGateway |
|
|
|
|
|
|
|
from py4j.protocol import Py4JError |
|
|
|
|
|
|
|
|
|
|
|
from pydolphinscheduler import configuration |
|
|
|
from pydolphinscheduler import __version__, configuration |
|
|
|
from pydolphinscheduler.constants import JavaGatewayDefault |
|
|
|
from pydolphinscheduler.constants import JavaGatewayDefault |
|
|
|
from pydolphinscheduler.exceptions import PyDSJavaGatewayException |
|
|
|
from pydolphinscheduler.exceptions import PyDSJavaGatewayException |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger = getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def launch_gateway( |
|
|
|
def launch_gateway( |
|
|
|
address: Optional[str] = None, |
|
|
|
address: Optional[str] = None, |
|
|
@ -75,6 +80,22 @@ class JavaGate: |
|
|
|
auto_convert: Optional[bool] = True, |
|
|
|
auto_convert: Optional[bool] = True, |
|
|
|
): |
|
|
|
): |
|
|
|
self.java_gateway = launch_gateway(address, port, auto_convert) |
|
|
|
self.java_gateway = launch_gateway(address, port, auto_convert) |
|
|
|
|
|
|
|
gateway_version = "unknown" |
|
|
|
|
|
|
|
with contextlib.suppress(Py4JError): |
|
|
|
|
|
|
|
# 1. Java gateway version is too old: doesn't have method 'getGatewayVersion()' |
|
|
|
|
|
|
|
# 2. Error connecting to Java gateway |
|
|
|
|
|
|
|
gateway_version = self.get_gateway_version() |
|
|
|
|
|
|
|
if gateway_version != __version__: |
|
|
|
|
|
|
|
logger.warning( |
|
|
|
|
|
|
|
f"Using unmatched version of pydolphinscheduler (version {__version__}) " |
|
|
|
|
|
|
|
f"and Java gateway (version {gateway_version}) may cause errors. " |
|
|
|
|
|
|
|
"We strongly recommend you to find the matched version " |
|
|
|
|
|
|
|
"(check: https://pypi.org/project/apache-dolphinscheduler)" |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_gateway_version(self): |
|
|
|
|
|
|
|
"""Get the java gateway version, expected to be equal with pydolphinscheduler.""" |
|
|
|
|
|
|
|
return self.java_gateway.entry_point.getGatewayVersion() |
|
|
|
|
|
|
|
|
|
|
|
def get_datasource_info(self, name: str): |
|
|
|
def get_datasource_info(self, name: str): |
|
|
|
"""Get datasource info through java gateway.""" |
|
|
|
"""Get datasource info through java gateway.""" |
|
|
|