Superset如何利用Celery进行定时任务介绍
关于Celery任务的介绍,其如何和Flask框架的集成,我们可以看如下Flask官网介绍的和Celery框架集成方法。
https://flask.palletsprojects.com/en/2.3.x/patterns/celery/
而回到我们这一章之中,我们可以从Celery任务的集成,新子任务的创建,定时任务的配置这几个方面来讲解。
首先是关于Celery任务的集成,这一点主要突出在项目启动时,CeleryApp的初始化,这一点可以参考前面Superset的启动源码
在启动源码之中,init_app函数之中,存在调用初始化celery_app的代码
def configure_celery(self) -> None:
celery_app.config_from_object(self.config[“CELERY_CONFIG”]) celery_app.set_default() superset_app = self.superset_app # Here, we want to ensure that every call into Celery task has an app context # setup properly task_base = celery_app.Task class AppContextTask(task_base): # type: ignore # pylint: disable=too-few-public-methods abstract = True # Grab each call into the task and set up an app context def __call__(self, *args: Any, **kwargs: Any) -> Any: with superset_app.app_context(): return task_base.__call__(self, *args, **kwargs) celery_app.Task = AppContextTask |
上述代码之中,主要是组装app
在其中主要就是读取CELERY_CONFIG,并设置进去。
不过这里我们先不看怎么声明Config进行配置,先看下相关代码的实现。
具体的代码存在于superset/tasks/cache.py文件下
其中主要的入口在于@celery_app.task(name=”cache-warmup”)注解下的cache_wamup方法之中,从字面意思来看,这个方法和缓存相关。
在这个方法之中入参明确说明了需要一个strategy_name
更进一步到代码之中,则是根据入参之中的strategy_name来匹配本类中声明的strategy的类名。在获取到class之后,根据配置中声明的username来获取到用户对应的cookie。
通过strategy中fetch_url函数获取到了url之后,利用celery_app注解声明的另一个函数进行调用。
@celery_app.task(name=”cache-warmup”)
def cache_warmup( strategy_name: str, *args: Any, **kwargs: Any ) -> Union[Dict[str, List[str]], str]: “”” This task periodically hits charts to warm up the cache. “”” class_ = None for class_ in strategies: if class_.name == strategy_name: # type: ignore break else: message = f”No strategy {strategy_name} found!” logger.error(message, exc_info=True) return message logger.info(“Loading %s”, class_.__name__) try: strategy = class_(*args, **kwargs) logger.info(“Success!”) except TypeError: message = “Error loading strategy!” logger.exception(message) return message user = security_manager.get_user_by_username(app.config[“THUMBNAIL_SELENIUM_USER”]) cookies = MachineAuthProvider.get_auth_cookies(user) headers = {“Cookie”: f”session={cookies.get(‘session’, ”)}”} results: Dict[str, List[str]] = {“scheduled”: [], “errors”: []} for url in strategy.get_urls(): try: logger.info(“Scheduling %s”, url) fetch_url.delay(url, headers) results[“scheduled”].append(url) except SchedulingError: logger.exception(“Error scheduling fetch_url: %s”, url) results[“errors”].append(url) return results |
这里我们先看一下其中strategy的代码,这里我们以DummyStrategy为例,这个类中的get_urls会返回所有chart的缓存url
class DummyStrategy(Strategy): # pylint: disable=too-few-public-methods
“”” This is a dummy strategy that will fetch all charts. Can be configured by: beat_schedule = { “”” name = “dummy” def get_urls(self) -> List[str]: session = db.create_scoped_session() charts = session.query(Slice).all() return [get_url(chart) for chart in charts] |
先查询到了所有的Slice,也就是chart
然后对所有chart进行url拼接。
形成诸如superset/warm_up_cache/?slice_id={chart.id}这种格式的url
获取到之后,就可以利用fetch_url.delay(url, headers)进行调用
这里对应的warm_up_cache的url,就是调用主节点的warm_up_cache接口
此接口本质上就是获取slice的查询结果并放到cache之中。
如果希望使用上述的定时缓存,则可以在config.py或者values.yaml中configOverrides字段下,重写CeleryConfig。
除了配置标准的broker_url,result_backend等信息
就是在beat_schedule中配置相关的任务信息
“cache-warmup-slice”: {
‘task’: “cache-warmup”, ‘schedule’: crontab(minute=1, hour=23, day_of_week=’*’), # @every day of pm 11:01 ‘kwargs’: { ‘strategy_name’: ‘dummy’, }, }, |
其task一直为cache-warmp
对应的schedule为一个crontab对象
然后kwargs中传入希望调用的’strategy_name
这样完成了对定时任务的声明和使用。
如果希望增加新的自定义任务,则可以要么重写函数并在上面增加注解@celery_app.task(name=”cache-warmup”)
在CeleryConfig中声明使用
要么就是定义一个’strategy,并定义配套的flask接口,从而进行使用。