Superset 如何注册新类型数据库详情
如果想要集成新类型的数据库,最简单的办法,就是查看
官网上是否有什么支持的https://superset.apache.org/docs/configuration/databases/
安装之后,就可以在datasource
列表下选择对应的database进行创建
而进一步到了Superset的框架之中,主要的数据源支持分为了两方面
一方案是需要安装driver,从而实现具体的驱动,另一方面是需要声明数据库详情。从而进行诸如细粒度校验入参,查询结果类型转换这些操作。
对于如何安装driver,只需要参考官网的方式,通过pip进行安装进行管理。
pip install pydruid
而对于如何声明EngineSpec,则需要我们查看Superset项目下的db_engine_specs文件夹。
这里我们先查看如何获取所有数据库EngineSpec,再查看EngineSpec中可以通过继承的方式修改哪些功能。
对于获取所有的EngineSpec的接口,存放在了目录下__init__.py文件中的get_available_engine_specs函数中
在这函数之中,首先是加载了所有sqlachemy原生支持的数据库和第三方安装的driver,获取到drive名称以及支持的drive,添加到defaultdict中
最后是加载当前文件夹下的数据库详情文件,配合上面的defaultdict,形成{数据库详情:dirver列表}
这里我们以最后一方面,加载当前文件夹下的数据库详情文件为准,查看如何进行加载的。
for engine_spec in load_engine_specs():
driver = drivers[engine_spec.engine] # do not add denied db engine specs to available list dbs_denylist = app.config[“DBS_AVAILABLE_DENYLIST”] dbs_denylist_engines = dbs_denylist.keys() if ( engine_spec.engine in dbs_denylist_engines and hasattr(engine_spec, “default_driver”) and engine_spec.default_driver in dbs_denylist[engine_spec.engine] ): continue # lookup driver by engine aliases. if not driver and engine_spec.engine_aliases: for alias in engine_spec.engine_aliases: driver = drivers[alias] if driver: break available_engines[engine_spec] = driver |
首先是加载所有的engine_specs
然后是判断是否有需要拒绝的,以及是否有别名的,根据上述条件进行跳过。
最后拼接为一个map,其中key为数据库Engine_Spec, Value是drive数组。
那么加载所有的engine_specs位于本类中的load_engine_specs函数中。
def load_engine_specs() -> List[Type[BaseEngineSpec]]:
“”” # load standard engines db_engine_spec_dir = str(Path(__file__).parent) for module_info in pkgutil.iter_modules([db_engine_spec_dir], prefix=”.”): module = import_module(module_info.name, package=__name__) engine_specs.extend( getattr(module, attr) for attr in module.__dict__ if is_engine_spec(getattr(module, attr)) ) # load additional engines from external modules for ep in iter_entry_points(“superset.db_engine_specs”): try: engine_spec = ep.load() except Exception: # pylint: disable=broad-except logger.warning(“Unable to load Superset DB engine spec: %s”, ep.name) continue engine_specs.append(engine_spec) return engine_specs |
上述代码中,首先是加载了所有本目录下的文件,根据其是否是py文件并且是否继承了BaseEngineSpec来决定是否添加到engine_specs之中。
然后是尝试从外部目录加载engine。
最后进行返回。
其实从上面可以看出,Superset会先根据现在已经安装可以使用的driver入手,先加载driver以及对应的默认engine,然后再根据项目中可以使用的Engine_Spec, 来给drive进行增强,从而在返回给前端的时候,可以根据EngineSpec是否存在Driver来展示可以创建哪些数据库。
那么在EngineSpec内部,存在着哪些函数呢?
这里我们以MysqlEngineSpec为例,看看他在继承了BaseEngineSpec之后,都通过实现哪些接口,增强了哪些能力。
首先是声明了engine_name,表明是对应MySQL的driver。
然后是 column_type_mapping
column_type_mappings = (
( re.compile(r”^int.*”, re.IGNORECASE), INTEGER(), GenericDataType.NUMERIC, ), ( re.compile(r”^tinyint”, re.IGNORECASE), TINYINT(), GenericDataType.NUMERIC, ), ( re.compile(r”^mediumint”, re.IGNORECASE), MEDIUMINT(), GenericDataType.NUMERIC, ), ( re.compile(r”^decimal”, re.IGNORECASE), DECIMAL(), GenericDataType.NUMERIC, ), ( re.compile(r”^float”, re.IGNORECASE), FLOAT(), GenericDataType.NUMERIC, ), ( re.compile(r”^double”, re.IGNORECASE), DOUBLE(), GenericDataType.NUMERIC, ), ( re.compile(r”^bit”, re.IGNORECASE), BIT(), GenericDataType.NUMERIC, ), ( re.compile(r”^tinytext”, re.IGNORECASE), TINYTEXT(), GenericDataType.STRING, ), ( re.compile(r”^mediumtext”, re.IGNORECASE), MEDIUMTEXT(), GenericDataType.STRING, ), ( re.compile(r”^longtext”, re.IGNORECASE), LONGTEXT(), GenericDataType.STRING, ), ) |
让用户在获取Table中column详情的时候,可以判断出column对应的数据类型。
之后是_time_grain_expressions,声明时间间隔对应的sql
_time_grain_expressions = {
None: “{col}”, “PT1S”: “DATE_ADD(DATE({col}), ” “INTERVAL (HOUR({col})*60*60 + MINUTE({col})*60″ ” + SECOND({col})) SECOND)”, “PT1M”: “DATE_ADD(DATE({col}), ” “INTERVAL (HOUR({col})*60 + MINUTE({col})) MINUTE)”, “PT1H”: “DATE_ADD(DATE({col}), ” “INTERVAL HOUR({col}) HOUR)”, “P1D”: “DATE({col})”, “P1W”: “DATE(DATE_SUB({col}, ” “INTERVAL DAYOFWEEK({col}) – 1 DAY))”, “P1M”: “DATE(DATE_SUB({col}, ” “INTERVAL DAYOFMONTH({col}) – 1 DAY))”, “P3M”: “MAKEDATE(YEAR({col}), 1) ” “+ INTERVAL QUARTER({col}) QUARTER – INTERVAL 1 QUARTER”, “P1Y”: “DATE(DATE_SUB({col}, ” “INTERVAL DAYOFYEAR({col}) – 1 DAY))”, “1969-12-29T00:00:00Z/P1W”: “DATE(DATE_SUB({col}, ” “INTERVAL DAYOFWEEK(DATE_SUB({col}, ” “INTERVAL 1 DAY)) – 1 DAY))”, } |
之后是是定义custom_errors
以及重写convert_dttm函数,这个函数的主要作用是支持用户按需将自己原本是str的字段转换为date或者datetime使用,这里就是封装相关sql使用
@classmethod
def convert_dttm( cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None ) -> Optional[str]: sqla_type = cls.get_sqla_column_type(target_type) if isinstance(sqla_type, types.Date): return f”STR_TO_DATE(‘{dttm.date().isoformat()}’, ‘%Y-%m-%d’)” if isinstance(sqla_type, types.DateTime): datetime_formatted = dttm.isoformat(sep=” “, timespec=”microseconds”) return f”””STR_TO_DATE(‘{datetime_formatted}’, ‘%Y-%m-%d %H:%i:%s.%f’)””” return None |
转换字段为unix时间戳
@classmethod
def epoch_to_dttm(cls) -> str: return “from_unixtime({col})” |
进行执行sql的取消
@classmethod
def get_cancel_query_id(cls, cursor: Any, query: Query) -> Optional[str]: “”” :param cursor: Cursor instance in which the query will be executed row = cursor.fetchone() return row[0] @classmethod def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool: “”” :param cursor: New cursor instance to the db of the query cursor.execute(f”KILL CONNECTION {cancel_query_id}”) except Exception: # pylint: disable=broad-except return False return True |
当然,BaseEngineSpec中还包含很多其他的函数可供继承实现。MySQL也只是实现了一部分。
MySQLEngineSpec除了继承BaseEngineSpec之外,还通过BasicParametersMixin进行类增强。
在BasicParametersMixin之中,提供了构建sql uri的能力。
@classmethod
def build_sqlalchemy_uri( # pylint: disable=unused-argument cls, parameters: BasicParametersType, encrypted_extra: Optional[Dict[str, str]] = None, ) -> str: # make a copy so that we don’t update the original query = parameters.get(“query”, {}).copy() if parameters.get(“encryption”): if not cls.encryption_parameters: raise Exception(“Unable to build a URL with encryption enabled”) query.update(cls.encryption_parameters) return str( URL( f”{cls.engine}+{cls.default_driver}”.rstrip(“+”), # type: ignore username=parameters.get(“username”), password=parameters.get(“password”), host=parameters[“host”], port=parameters[“port”], database=parameters[“database”], query=query, ) ) |
根据uri解析参数的能力
@classmethod
def get_parameters_from_uri( # pylint: disable=unused-argument cls, uri: str, encrypted_extra: Optional[Dict[str, Any]] = None ) -> BasicParametersType: url = make_url_safe(uri) query = { key: value for (key, value) in url.query.items() if (key, value) not in cls.encryption_parameters.items() } encryption = all( item in url.query.items() for item in cls.encryption_parameters.items() ) return { “username”: url.username, “password”: url.password, “host”: url.host, “port”: url.port, “database”: url.database, “query”: query, “encryption”: encryption, } |
校验传入参数的能力
@classmethod
def validate_parameters( cls, properties: BasicPropertiesType ) -> List[SupersetError]: “”” If only the hostname is present it will check if the name is resolvable. As more required = {“host”, “port”, “username”, “database”} parameters = properties.get(“parameters”, {}) present = {key for key in parameters if parameters.get(key, ())} missing = sorted(required – present) if missing: errors.append( SupersetError( message=f’One or more parameters are missing: {“, “.join(missing)}’, error_type=SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR, level=ErrorLevel.WARNING, extra={“missing”: missing}, ), ) host = parameters.get(“host”, None) if not host: return errors if not is_hostname_valid(host): errors.append( SupersetError( message=”The hostname provided can’t be resolved.”, error_type=SupersetErrorType.CONNECTION_INVALID_HOSTNAME_ERROR, level=ErrorLevel.ERROR, extra={“invalid”: [“host”]}, ), ) return errors port = parameters.get(“port”, None) if not port: return errors try: port = int(port) except (ValueError, TypeError): errors.append( SupersetError( message=”Port must be a valid integer.”, error_type=SupersetErrorType.CONNECTION_INVALID_PORT_ERROR, level=ErrorLevel.ERROR, extra={“invalid”: [“port”]}, ), ) if not (isinstance(port, int) and 0 <= port < 2 ** 16): errors.append( SupersetError( message=( “The port must be an integer between 0 and 65535 ” “(inclusive).” ), error_type=SupersetErrorType.CONNECTION_INVALID_PORT_ERROR, level=ErrorLevel.ERROR, extra={“invalid”: [“port”]}, ), ) elif not is_port_open(host, port): errors.append( SupersetError( message=”The port is closed.”, error_type=SupersetErrorType.CONNECTION_PORT_CLOSED_ERROR, level=ErrorLevel.ERROR, extra={“invalid”: [“port”]}, ), ) return errors |
声明所需参数的能力
@classmethod
def parameters_json_schema(cls) -> Any: “”” return None spec = APISpec( title=”Database Parameters”, version=”1.0.0″, openapi_version=”3.0.2″, plugins=[MarshmallowPlugin()], ) spec.components.schema(cls.__name__, schema=cls.parameters_schema) return spec.to_dict()[“components”][“schemas”][cls.__name__] |
同样我们也可以继承并书写自己的BasicParametersMixin
从而增强属于自己的EngineSpec。
那么这一章,到此我们就说了Superset如何去读取可以使用的数据库,以及如何利用内部声明的EngineSpec来给数据库查询进行赋能的全过程。