Superset如何利用Celery进行定时任务介绍

关于Celery任务的介绍,其如何和Flask框架的集成,我们可以看如下Flask官网介绍的和Celery框架集成方法。

https://flask.palletsprojects.com/en/2.3.x/patterns/celery/

而回到我们这一章之中,我们可以从Celery任务的集成,新子任务的创建,定时任务的配置这几个方面来讲解。

首先是关于Celery任务的集成,这一点主要突出在项目启动时,CeleryApp的初始化,这一点可以参考前面Superset的启动源码

在启动源码之中,init_app函数之中,存在调用初始化celery_app的代码

def configure_celery(self) -> None:

celery_app.config_from_object(self.config[“CELERY_CONFIG”])

celery_app.set_default()

superset_app = self.superset_app

# Here, we want to ensure that every call into Celery task has an app context

# setup properly

task_base = celery_app.Task

class AppContextTask(task_base): # type: ignore

# pylint: disable=too-few-public-methods

abstract = True

# Grab each call into the task and set up an app context

def __call__(self, *args: Any, **kwargs: Any) -> Any:

with superset_app.app_context():

return task_base.__call__(self, *args, **kwargs)

celery_app.Task = AppContextTask

上述代码之中,主要是组装app

在其中主要就是读取CELERY_CONFIG,并设置进去。

不过这里我们先不看怎么声明Config进行配置,先看下相关代码的实现。

具体的代码存在于superset/tasks/cache.py文件下

其中主要的入口在于@celery_app.task(name=”cache-warmup”)注解下的cache_wamup方法之中,从字面意思来看,这个方法和缓存相关。

在这个方法之中入参明确说明了需要一个strategy_name

更进一步到代码之中,则是根据入参之中的strategy_name来匹配本类中声明的strategy的类名。在获取到class之后,根据配置中声明的username来获取到用户对应的cookie。

通过strategy中fetch_url函数获取到了url之后,利用celery_app注解声明的另一个函数进行调用。

@celery_app.task(name=”cache-warmup”)

def cache_warmup(

strategy_name: str, *args: Any, **kwargs: Any

) -> Union[Dict[str, List[str]], str]:

“””
Warm up cache.

This task periodically hits charts to warm up the cache.

“””
logger.info(“Loading strategy”)

class_ = None

for class_ in strategies:

if class_.name == strategy_name: # type: ignore

break

else:

message = f”No strategy {strategy_name} found!”

logger.error(message, exc_info=True)

return message

logger.info(“Loading %s”, class_.__name__)

try:

strategy = class_(*args, **kwargs)

logger.info(“Success!”)

except TypeError:

message = “Error loading strategy!”

logger.exception(message)

return message

user = security_manager.get_user_by_username(app.config[“THUMBNAIL_SELENIUM_USER”])

cookies = MachineAuthProvider.get_auth_cookies(user)

headers = {“Cookie”: f”session={cookies.get(‘session’, ”)}”}

results: Dict[str, List[str]] = {“scheduled”: [], “errors”: []}

for url in strategy.get_urls():

try:

logger.info(“Scheduling %s”, url)

fetch_url.delay(url, headers)

results[“scheduled”].append(url)

except SchedulingError:

logger.exception(“Error scheduling fetch_url: %s”, url)

results[“errors”].append(url)

return results

这里我们先看一下其中strategy的代码,这里我们以DummyStrategy为例,这个类中的get_urls会返回所有chart的缓存url

class DummyStrategy(Strategy): # pylint: disable=too-few-public-methods

“””
Warm up all charts.

This is a dummy strategy that will fetch all charts. Can be configured by:

beat_schedule = {
‘cache-warmup-hourly’: {
‘task’: ‘cache-warmup’,
‘schedule’: crontab(minute=1, hour=’*’), # @hourly
‘kwargs’: {‘strategy_name’: ‘dummy’},
},
}

“””

name = “dummy”

def get_urls(self) -> List[str]:

session = db.create_scoped_session()

charts = session.query(Slice).all()

return [get_url(chart) for chart in charts]

先查询到了所有的Slice,也就是chart

然后对所有chart进行url拼接。

形成诸如superset/warm_up_cache/?slice_id={chart.id}这种格式的url

获取到之后,就可以利用fetch_url.delay(url, headers)进行调用

这里对应的warm_up_cache的url,就是调用主节点的warm_up_cache接口

此接口本质上就是获取slice的查询结果并放到cache之中。

如果希望使用上述的定时缓存,则可以在config.py或者values.yaml中configOverrides字段下,重写CeleryConfig。

除了配置标准的broker_url,result_backend等信息

就是在beat_schedule中配置相关的任务信息

“cache-warmup-slice”: {

‘task’: “cache-warmup”,

‘schedule’: crontab(minute=1, hour=23, day_of_week=’*’), # @every day of pm 11:01

‘kwargs’: {

‘strategy_name’: ‘dummy’,

},

},

其task一直为cache-warmp

对应的schedule为一个crontab对象

然后kwargs中传入希望调用的’strategy_name

这样完成了对定时任务的声明和使用。

如果希望增加新的自定义任务,则可以要么重写函数并在上面增加注解@celery_app.task(name=”cache-warmup”)

在CeleryConfig中声明使用

要么就是定义一个’strategy,并定义配套的flask接口,从而进行使用。

发表评论

邮箱地址不会被公开。 必填项已用*标注