Superset的模板类机制
本章呢,我们将拆解下Superset在父类中封装好的接口,在其中看到父类接口如何利用自定义的filter,在查询过程进行过滤,如何利用传入或者封装的schema来控制返回的结构体。本章我们将全程围绕api中获取对象list的接口,进行相关的拆解。
那么话不多说,我们直接开始。
在Superset之中,关于对象的list查询主要是放在了Flask中app_builder来做。
这里可以看到
class DashboardRestApi(BaseSupersetModelRestApi)
DashboardRestApi类中,其中继承了BaseSupersetModelRestApi
而在BaseSupersetModelRestApi之中,实现了get_list_headless接口
在其中主要是记录了执行时间,并作为log保存下来,主要的实现还是交给了父类的get_list_headless实现。
@handle_api_exception
def get_list_headless(self, **kwargs: Any) -> Response: “”” Add statsd metrics to builtin FAB GET list endpoint “”” duration, response = time_function(super().get_list_headless, **kwargs) self.send_stats_metrics(response, self.get_list.__name__, duration) return response |
而这个get_list_headless的触发,则是存在于flask_appbuilder的BaseApi之中。
是由BaseApi之中的get函数触发的
@expose(“/”, methods=[“GET”])
@protect() @safe @permission_name(“get”) @rison(get_list_schema) @merge_response_func(merge_order_columns, API_ORDER_COLUMNS_RIS_KEY) @merge_response_func(merge_list_label_columns, API_LABEL_COLUMNS_RIS_KEY) @merge_response_func(merge_description_columns, API_DESCRIPTION_COLUMNS_RIS_KEY) @merge_response_func(merge_list_columns, API_LIST_COLUMNS_RIS_KEY) @merge_response_func(merge_list_title, API_LIST_TITLE_RIS_KEY) def get_list(self, **kwargs: Any) -> Response: “””Get list of items from Model |
在其中就是触发了get_list_headless函数
而在真正实现get_list逻辑的get_list_headless函数中,则是会根据request,获取到相对应的list列表。
在get_list_headless之中,则是首先拼接返回的格式体。
这里尝试从入参之中获取参数中传入的字段数组。
select_cols = args.get(API_SELECT_COLUMNS_RIS_KEY, [])
pruned_select_cols = [col for col in select_cols if col in self.list_columns] |
传入参数是一个数组,譬如
https://localhost:8088/api/v1/dataset/?q=(columns:!(datasource_name,datasource_id),keys:!(none),order_column:table_name,order_direction:asc,page:0,page_size:100) |
在获取到入参之中的字段数组之后,还会匹配self.list_columns中的值,进行过滤。
在之后就拼装为返回的schema对象。
if pruned_select_cols:
list_model_schema = self.model2schemaconverter.convert(pruned_select_cols) else: list_model_schema = self.list_model_schema |
这里我们首先看下self.list_columns的声明方式,这里我们拿dashboards的api来看
list_columns = [
“id”, “published”, “status”, “slug”, “url”, “css”, “position_json”, “json_metadata”, “thumbnail_url”, “certified_by”, “certification_details”, “changed_by.first_name”, “changed_by.last_name”, “changed_by.username”, “changed_by.id”, “changed_by_name”, “changed_by_url”, “changed_on_utc”, “changed_on_delta_humanized”, “created_on_delta_humanized”, “created_by.first_name”, “created_by.id”, “created_by.last_name”, “dashboard_title”, “owners.id”, “owners.username”, “owners.first_name”, “owners.last_name”, “owners.email”, “roles.id”, “roles.name”, “is_managed_externally”, ] |
上面之中,一种有三种声明方式,
除了可以声明实体中的字段,也可以声明关联实体中的字段,以及类中的函数。
比如关连实体中的字段
owners = relationship(security_manager.user_model, secondary=dashboard_user)
从而以数组的方式,返回关联实体之中的特定字段
除此之外,对应的声明函数,从而在返回的时候触发函数调用
@renders(“changed_on”)
def changed_on_utc(self) -> str: # Convert naive datetime to UTC return self.changed_on.astimezone(pytz.utc).strftime(“%Y-%m-%dT%H:%M:%S.%f%z”) |
那么回到get_list_headless函数中,在获取到了list_model_schema之后。
会处理所有的过滤器进行处理
joined_filters = self._handle_filters_args(args)
在类中,会根据传入的filters字段,组装所有的filters,
self._filters.rest_add_filters(rison_args.get(API_FILTERS_RIS_KEY, []))
q=(filters:!((col:owners,opr:rel_m_m,value:25))
获取到的是一个字典数组。
def rest_add_filters(self, data: List[Dict]) -> None:
“”” :param data: list of dicts try: opr = _filter[“opr”] col = _filter[“col”] value = _filter[“value”] except KeyError: log.warning(“Invalid filter”) return # Get filter class from defaults filter_class = map_args_filter.get(opr, None) if filter_class: if col not in self.search_columns: raise InvalidColumnFilterFABException( f”Filter column: {col} not allowed to filter” ) elif not self._rest_check_valid_filter_operation(col, opr): raise InvalidOperationFilterFABException( f”Filter operation: {opr} not allowed on column: {col}” ) self.add_filter(col, filter_class, value) continue # Get filter class from custom defined filters filters = self._search_filters.get(col) if filters: for filter in filters: if filter.arg_name == opr: self.add_filter(col, filter, value) break else: raise InvalidOperationFilterFABException( f”Filter operation: {opr} not allowed on column: {col}” ) |
在函数之中,会从声明的搜索过滤器中进行匹配,如果col和opr匹配上了,就会加到当前类中的filters数组中
def add_filter(self, column_name, filter_class, value):
self._add_filter(filter_class(column_name, self.datamodel), value) return self |
最终加上self.base_filters,作为一个数组一同返回。
这里我们看一个filters的实现。我们拿dashboard的title filter作为举例对象
“dashboard_title”: [DashboardTitleOrSlugFilter]
class DashboardTitleOrSlugFilter(BaseFilter): # pylint: disable=too-few-public-methods name = _(“Title or Slug”) arg_name = “title_or_slug” def apply(self, query: Query, value: Any) -> Query: if not value: return query ilike_value = f”%{value}%” return query.filter( or_( Dashboard.dashboard_title.ilike(ilike_value), Dashboard.slug.ilike(ilike_value), ) ) |
其中入参的col 为 dashboard_title ,opr 为 title_or_slug
然后在apply中,将入参中的val作为value传了进去。
其中进行了拼接,在sql查询中,增加了新的filter条件,在or中分别对dashboard_title和slug字段进行了模糊匹配。
在filter准备完毕,
解析来的代码又解析除了orderby的字段和offset,limit字段
try:
order_column, order_direction = self._handle_order_args(args) except InvalidOrderByColumnFABException as e: return self.response_400(message=str(e)) # handle pagination page_index, page_size = self._handle_page_args(args) |
之后就是进行上述准备好的字段进行查询。
count, lst = self.datamodel.query(
joined_filters, order_column, order_direction, page=page_index, page_size=page_size, select_columns=self.list_select_columns, outer_default_load=self.list_outer_default_load, ) |
在query方法中,主要逻辑为
if not self.session:
raise InterfaceQueryWithoutSession() query = self.session.query(self.obj) count = self.query_count(query, filters, select_columns) query = self.apply_all( query, filters, order_column, order_direction, page, page_size, select_columns, ) query_results = query.all() result = [] for item in query_results: if hasattr(item, self.obj.__name__): result.append(getattr(item, self.obj.__name__)) else: return count, query_results return count, result |
先不增加offset和limit,orderby字段,进行一次增加filter的count(*)查询,获取到数据量。
然后增加上orderby,limit等信息,进行一次查询,获取当前页的实体对象数组。
最终将整数数据量和对象数组一同返回。
交给上层的get_list_headless函数,最后利用上面准备好的list_model_schema进行dump之后,带上count整数一同返回。
pks = self.datamodel.get_keys(lst)
response[API_RESULT_RES_KEY] = list_model_schema.dump(lst, many=True) response[“ids”] = pks response[“count”] = count self.pre_get_list(response) return self.response(200, **response) |
那么上面就是我们get_list的全过程。
在这个函数中,一共暴露了两个可供修改的点,分别是list_column和search_filter
我们可以在list_column中通过声明函数或者在对象中增加新的字段的方式,来增加返回的信息。
也可以在search_filter之中增加新的过滤器,来修改定义属于自己的搜索方式。
那么,这就是superset中如何利用flask模板类,完成搜索的相关讲解。