Prefect是一个开源的Python工作流引擎,旨在简化和自动化数据工程和数据科学任务的编排和调度。它提供了一个易于使用的工作流定义和管理框架,使用户能够构建复杂的工作流程,处理依赖关系,并确保任务的顺序和并发执行。
下面是Prefect的一些关键特性:
声明式定义:Prefect允许您以声明式的方式定义工作流。您可以使用Python编写工作流代码,并以自然语言的方式描述任务之间的依赖关系。这样,您可以将精力集中在定义任务和流程逻辑上,而无需关注底层调度和执行细节。
任务编排:Prefect提供了丰富的任务编排功能,可以灵活地定义任务之间的依赖关系和运行顺序。您可以使用简单的装饰器来定义任务,以及任务之间的依赖关系。Prefect将自动处理任务的并发性和顺序执行,确保任务按正确的顺序运行。
可视化界面:Prefect提供了一个直观的Web界面,用于可视化工作流的结构和执行情况。您可以通过界面查看工作流的整体结构,监控任务的运行状态和历史记录,并在需要时进行干预和调整。
跨平台支持:Prefect可以在多种执行环境中运行,包括本地机器、云服务和容器。它与各种基础设施和工具集成,如Docker、Kubernetes、Airflow等,使您能够在不同的环境中灵活地部署和执行工作流。
弹性调度:Prefect提供了弹性调度功能,可以根据任务的资源需求和优先级,动态地分配和管理计算资源。它可以自动扩展和缩减计算资源,确保任务能够在适当的时间和地点运行。
强大的扩展性:Prefect具有丰富的插件和扩展机制,可以根据需要进行自定义和扩展。您可以编写自己的任务和调度器,以满足特定的需求,并与其他工具和服务进行集成。
总的来说,Prefect是一个功能强大、易于使用的工作流引擎,可以帮助您更好地管理和编排数据工程和数据科学任务。无论是处理大规模数据管道还是构建复杂的数据处理流程,Prefect都能为您提供灵活、可靠和可扩展的解决方案。
简单地讲,它就是一个python中的方法。Flow用来承载业务逻辑,帮助用户与流程进行交互和写作的一个容器。使用@flow装饰器表示方法。有如下优点:捕捉State transitions;校验输入;失败重试;超时控制;日志捕捉。Flow调用task,task之间不能互相调用,flow之间可以互相调用(subflow概念)。
from prefect import flow, task
@task
def print_hello(name):
print(f"Hello {name}!")
@flow(name="Hello Flow")
def hello_world(name="world"):
print_hello(name)
设置(参数):
description:可选
name:可选
retries:可选,失败重试次数
retry_delay_seconds:重试延迟
flow_run_name:引擎注册名称
task_runner:默认采用ConcurrentTaskRunner
timeout_seconds:超时设置
validate_parameters:是否校验
version:版本
组合(subflow):
Flow内可调用其他flow,这时候被调用的flow是subflow或者childflow,当subflow被调用时,会创建一个新的task runner。subflow会阻塞parentflow的运行。当然也可以并行运行,参考AnyIO task groups 或者asyncio.gather这些异步方式。
如果task之间互相调用,务必采用如下方式:task.fn(),此种调用不被推荐,会缺失掉很多系统支持比如:重试,日志追踪等。
from prefect import flow, task
@task
def my_first_task(msg):
print(f"Hello, {msg}")
@task
def my_second_task(msg):
my_first_task.fn(msg)
@flow
def my_flow():
my_second_task("Trillian")
设置(参数):
name:可选
description:可选
tags:可选,方便后台管理界面进行过滤查询
cache_key_fn:上下文缓存
cache_expiration:上下文缓存时间
task_run_name:模版名称,可使用变量或者其他方法(返回string)
retries:
retry_delay_seconds:
version:
Caching:使用caching可以利用其他任务的计算结果以提高效率(输入参数相同的情况,对task设计要求较高,同时要求尽可能解藕同时合理控制task的粒度)
处理task和flow的返回值。当直接调用flow或者task时,result用来接受返回值。
result可接受异步处理结果。
result也可进行持久化。当运行工作流时,Prefect会保存所有task和flow的结果(默认在内存中)以方便传给下游,这时当对象过大时,需要进行相应持久化。
Artifacts(数据样例)可以把flow运行的结果通过多种形式进行持久化并输出,形式包括tables,files 或者links。通过artifacts可以方便的管理和共享信息。
使用场景:Debugging;Data quality checks;Documentation
我们从States中可以得知哪些信息呢?
状态类型(表示结果)
状态类型(表示过程)
返回值类型
使用Block能完成与外部系统之间的交互。其中包括AWS,GitHub,Slack等。
也可以按照规范自定义新的block类型。
Variables允许存储和重用数据,包括配置信息等。可以理解成环境变量。
Prefect提供多种默认的task runner供选择,可以根据自己的需求选择不同的runner,其中包括concurrent, parallel, distributed等。通过调用task.submit()向任务运行器task runner提交一个任务并返回PrefectFuture。
内置的task runners:
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import time
@task
def stop_at_floor(floor):
print(f"elevator moving to floor {floor}")
time.sleep(floor)
print(f"elevator stops on floor {floor}")
@flow(task_runner=ConcurrentTaskRunner())
def elevator():
for floor in range(10, 0, -1):
stop_at_floor.submit(floor)
flow和subflow的task ruanner之间可以不同。
使用runtime context可以查看当前flow或者task等当前prefect引擎下的对象内部信息。
from prefect import flow, task
import prefect.runtime
@flow(log_prints=True)
def my_flow(x):
print("My name is", prefect.runtime.flow_run.name)
print("I belong to deployment", prefect.runtime.deployment.name)
my_task(2)
@task
def my_task(y):
print("My name is", prefect.runtime.task_run.name)
print("Flow run parameters:", prefect.runtime.flow_run.parameters)
my_flow(1)
通过修改Prefect引擎的默认设置,可以定制其运行的特点。支持命令行和Prefect自带的后台管理进行修改。
可设置的参数包括:
参考:https://docs.prefect.io/latest/concepts/flows/
全部评论