Superset 如何注册新类型数据库详情

如果想要集成新类型的数据库,最简单的办法,就是查看

官网上是否有什么支持的https://superset.apache.org/docs/configuration/databases/

安装之后,就可以在datasource

列表下选择对应的database进行创建

而进一步到了Superset的框架之中,主要的数据源支持分为了两方面

一方案是需要安装driver,从而实现具体的驱动,另一方面是需要声明数据库详情。从而进行诸如细粒度校验入参,查询结果类型转换这些操作。

对于如何安装driver,只需要参考官网的方式,通过pip进行安装进行管理。

pip install pydruid

而对于如何声明EngineSpec,则需要我们查看Superset项目下的db_engine_specs文件夹。

这里我们先查看如何获取所有数据库EngineSpec,再查看EngineSpec中可以通过继承的方式修改哪些功能。

对于获取所有的EngineSpec的接口,存放在了目录下__init__.py文件中的get_available_engine_specs函数中

在这函数之中,首先是加载了所有sqlachemy原生支持的数据库和第三方安装的driver,获取到drive名称以及支持的drive,添加到defaultdict中

最后是加载当前文件夹下的数据库详情文件,配合上面的defaultdict,形成{数据库详情:dirver列表}

这里我们以最后一方面,加载当前文件夹下的数据库详情文件为准,查看如何进行加载的。

for engine_spec in load_engine_specs():

driver = drivers[engine_spec.engine]

# do not add denied db engine specs to available list

dbs_denylist = app.config[“DBS_AVAILABLE_DENYLIST”]

dbs_denylist_engines = dbs_denylist.keys()

if (

engine_spec.engine in dbs_denylist_engines

and hasattr(engine_spec, “default_driver”)

and engine_spec.default_driver in dbs_denylist[engine_spec.engine]

):

continue

# lookup driver by engine aliases.

if not driver and engine_spec.engine_aliases:

for alias in engine_spec.engine_aliases:

driver = drivers[alias]

if driver:

break

available_engines[engine_spec] = driver

首先是加载所有的engine_specs

然后是判断是否有需要拒绝的,以及是否有别名的,根据上述条件进行跳过。

最后拼接为一个map,其中key为数据库Engine_Spec, Value是drive数组。

那么加载所有的engine_specs位于本类中的load_engine_specs函数中。

def load_engine_specs() -> List[Type[BaseEngineSpec]]:

“””
Load all engine specs, native and 3rd party.
“””
engine_specs: List[Type[BaseEngineSpec]] = []

# load standard engines

db_engine_spec_dir = str(Path(__file__).parent)

for module_info in pkgutil.iter_modules([db_engine_spec_dir], prefix=”.”):

module = import_module(module_info.name, package=__name__)

engine_specs.extend(

getattr(module, attr)

for attr in module.__dict__

if is_engine_spec(getattr(module, attr))

)

# load additional engines from external modules

for ep in iter_entry_points(“superset.db_engine_specs”):

try:

engine_spec = ep.load()

except Exception: # pylint: disable=broad-except

logger.warning(“Unable to load Superset DB engine spec: %s”, ep.name)

continue

engine_specs.append(engine_spec)

return engine_specs

上述代码中,首先是加载了所有本目录下的文件,根据其是否是py文件并且是否继承了BaseEngineSpec来决定是否添加到engine_specs之中。

然后是尝试从外部目录加载engine。

最后进行返回。

其实从上面可以看出,Superset会先根据现在已经安装可以使用的driver入手,先加载driver以及对应的默认engine,然后再根据项目中可以使用的Engine_Spec, 来给drive进行增强,从而在返回给前端的时候,可以根据EngineSpec是否存在Driver来展示可以创建哪些数据库。

那么在EngineSpec内部,存在着哪些函数呢?

这里我们以MysqlEngineSpec为例,看看他在继承了BaseEngineSpec之后,都通过实现哪些接口,增强了哪些能力。

首先是声明了engine_name,表明是对应MySQL的driver。

然后是 column_type_mapping

column_type_mappings = (

(

re.compile(r”^int.*”, re.IGNORECASE),

INTEGER(),

GenericDataType.NUMERIC,

),

(

re.compile(r”^tinyint”, re.IGNORECASE),

TINYINT(),

GenericDataType.NUMERIC,

),

(

re.compile(r”^mediumint”, re.IGNORECASE),

MEDIUMINT(),

GenericDataType.NUMERIC,

),

(

re.compile(r”^decimal”, re.IGNORECASE),

DECIMAL(),

GenericDataType.NUMERIC,

),

(

re.compile(r”^float”, re.IGNORECASE),

FLOAT(),

GenericDataType.NUMERIC,

),

(

re.compile(r”^double”, re.IGNORECASE),

DOUBLE(),

GenericDataType.NUMERIC,

),

(

re.compile(r”^bit”, re.IGNORECASE),

BIT(),

GenericDataType.NUMERIC,

),

(

re.compile(r”^tinytext”, re.IGNORECASE),

TINYTEXT(),

GenericDataType.STRING,

),

(

re.compile(r”^mediumtext”, re.IGNORECASE),

MEDIUMTEXT(),

GenericDataType.STRING,

),

(

re.compile(r”^longtext”, re.IGNORECASE),

LONGTEXT(),

GenericDataType.STRING,

),

)

让用户在获取Table中column详情的时候,可以判断出column对应的数据类型。

之后是_time_grain_expressions,声明时间间隔对应的sql

_time_grain_expressions = {

None: “{col}”,

“PT1S”: “DATE_ADD(DATE({col}), ”

“INTERVAL (HOUR({col})*60*60 + MINUTE({col})*60″

” + SECOND({col})) SECOND)”,

“PT1M”: “DATE_ADD(DATE({col}), ”

“INTERVAL (HOUR({col})*60 + MINUTE({col})) MINUTE)”,

“PT1H”: “DATE_ADD(DATE({col}), ” “INTERVAL HOUR({col}) HOUR)”,

“P1D”: “DATE({col})”,

“P1W”: “DATE(DATE_SUB({col}, ” “INTERVAL DAYOFWEEK({col}) – 1 DAY))”,

“P1M”: “DATE(DATE_SUB({col}, ” “INTERVAL DAYOFMONTH({col}) – 1 DAY))”,

“P3M”: “MAKEDATE(YEAR({col}), 1) ”

“+ INTERVAL QUARTER({col}) QUARTER – INTERVAL 1 QUARTER”,

“P1Y”: “DATE(DATE_SUB({col}, ” “INTERVAL DAYOFYEAR({col}) – 1 DAY))”,

“1969-12-29T00:00:00Z/P1W”: “DATE(DATE_SUB({col}, ”

“INTERVAL DAYOFWEEK(DATE_SUB({col}, ”

“INTERVAL 1 DAY)) – 1 DAY))”,

}

之后是是定义custom_errors

以及重写convert_dttm函数,这个函数的主要作用是支持用户按需将自己原本是str的字段转换为date或者datetime使用,这里就是封装相关sql使用

@classmethod

def convert_dttm(

cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None

) -> Optional[str]:

sqla_type = cls.get_sqla_column_type(target_type)

if isinstance(sqla_type, types.Date):

return f”STR_TO_DATE(‘{dttm.date().isoformat()}’, ‘%Y-%m-%d’)”

if isinstance(sqla_type, types.DateTime):

datetime_formatted = dttm.isoformat(sep=” “, timespec=”microseconds”)

return f”””STR_TO_DATE(‘{datetime_formatted}’, ‘%Y-%m-%d %H:%i:%s.%f’)”””

return None

转换字段为unix时间戳

@classmethod

def epoch_to_dttm(cls) -> str:

return “from_unixtime({col})”

进行执行sql的取消

@classmethod

def get_cancel_query_id(cls, cursor: Any, query: Query) -> Optional[str]:

“””
Get MySQL connection ID that will be used to cancel all other running
queries in the same connection.

:param cursor: Cursor instance in which the query will be executed
:param query: Query instance
:return: MySQL Connection ID
“””
cursor.execute(“SELECT CONNECTION_ID()”)

row = cursor.fetchone()

return row[0]

@classmethod

def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool:

“””
Cancel query in the underlying database.

:param cursor: New cursor instance to the db of the query
:param query: Query instance
:param cancel_query_id: MySQL Connection ID
:return: True if query cancelled successfully, False otherwise
“””
try:

cursor.execute(f”KILL CONNECTION {cancel_query_id}”)

except Exception: # pylint: disable=broad-except

return False

return True

当然,BaseEngineSpec中还包含很多其他的函数可供继承实现。MySQL也只是实现了一部分。

MySQLEngineSpec除了继承BaseEngineSpec之外,还通过BasicParametersMixin进行类增强。

在BasicParametersMixin之中,提供了构建sql uri的能力。

@classmethod

def build_sqlalchemy_uri( # pylint: disable=unused-argument

cls,

parameters: BasicParametersType,

encrypted_extra: Optional[Dict[str, str]] = None,

) -> str:

# make a copy so that we don’t update the original

query = parameters.get(“query”, {}).copy()

if parameters.get(“encryption”):

if not cls.encryption_parameters:

raise Exception(“Unable to build a URL with encryption enabled”)

query.update(cls.encryption_parameters)

return str(

URL(

f”{cls.engine}+{cls.default_driver}”.rstrip(“+”), # type: ignore

username=parameters.get(“username”),

password=parameters.get(“password”),

host=parameters[“host”],

port=parameters[“port”],

database=parameters[“database”],

query=query,

)

)

根据uri解析参数的能力

@classmethod

def get_parameters_from_uri( # pylint: disable=unused-argument

cls, uri: str, encrypted_extra: Optional[Dict[str, Any]] = None

) -> BasicParametersType:

url = make_url_safe(uri)

query = {

key: value

for (key, value) in url.query.items()

if (key, value) not in cls.encryption_parameters.items()

}

encryption = all(

item in url.query.items() for item in cls.encryption_parameters.items()

)

return {

“username”: url.username,

“password”: url.password,

“host”: url.host,

“port”: url.port,

“database”: url.database,

“query”: query,

“encryption”: encryption,

}

校验传入参数的能力

@classmethod

def validate_parameters(

cls, properties: BasicPropertiesType

) -> List[SupersetError]:

“””
Validates any number of parameters, for progressive validation.

If only the hostname is present it will check if the name is resolvable. As more
parameters are present in the request, more validation is done.
“””
errors: List[SupersetError] = []

required = {“host”, “port”, “username”, “database”}

parameters = properties.get(“parameters”, {})

present = {key for key in parameters if parameters.get(key, ())}

missing = sorted(required – present)

if missing:

errors.append(

SupersetError(

message=f’One or more parameters are missing: {“, “.join(missing)}’,

error_type=SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR,

level=ErrorLevel.WARNING,

extra={“missing”: missing},

),

)

host = parameters.get(“host”, None)

if not host:

return errors

if not is_hostname_valid(host):

errors.append(

SupersetError(

message=”The hostname provided can’t be resolved.”,

error_type=SupersetErrorType.CONNECTION_INVALID_HOSTNAME_ERROR,

level=ErrorLevel.ERROR,

extra={“invalid”: [“host”]},

),

)

return errors

port = parameters.get(“port”, None)

if not port:

return errors

try:

port = int(port)

except (ValueError, TypeError):

errors.append(

SupersetError(

message=”Port must be a valid integer.”,

error_type=SupersetErrorType.CONNECTION_INVALID_PORT_ERROR,

level=ErrorLevel.ERROR,

extra={“invalid”: [“port”]},

),

)

if not (isinstance(port, int) and 0 <= port < 2 ** 16):

errors.append(

SupersetError(

message=(

“The port must be an integer between 0 and 65535 ”

“(inclusive).”

),

error_type=SupersetErrorType.CONNECTION_INVALID_PORT_ERROR,

level=ErrorLevel.ERROR,

extra={“invalid”: [“port”]},

),

)

elif not is_port_open(host, port):

errors.append(

SupersetError(

message=”The port is closed.”,

error_type=SupersetErrorType.CONNECTION_PORT_CLOSED_ERROR,

level=ErrorLevel.ERROR,

extra={“invalid”: [“port”]},

),

)

return errors

声明所需参数的能力

@classmethod

def parameters_json_schema(cls) -> Any:

“””
Return configuration parameters as OpenAPI.
“””
if not cls.parameters_schema:

return None

spec = APISpec(

title=”Database Parameters”,

version=”1.0.0″,

openapi_version=”3.0.2″,

plugins=[MarshmallowPlugin()],

)

spec.components.schema(cls.__name__, schema=cls.parameters_schema)

return spec.to_dict()[“components”][“schemas”][cls.__name__]

同样我们也可以继承并书写自己的BasicParametersMixin

从而增强属于自己的EngineSpec。

那么这一章,到此我们就说了Superset如何去读取可以使用的数据库,以及如何利用内部声明的EngineSpec来给数据库查询进行赋能的全过程。

发表评论

邮箱地址不会被公开。 必填项已用*标注