停止正在运行的dag
触发某个dag执行时,需要填写参数,其中execution_date参数是必须要填写的,而且该参数不能和以前提交的任务参数相同。
#获取运行任务的信息
从以上输出结果可以看出,任务已经在队列中排队了。其中返回的dag_run_id字段,就是基于静态的dag生成的运行的实体的id。可以根据dag_run_id来查看该dag运行时的状态信息。在业务层面,该dag_run_id往往会和某个具体的业务计算的任务关联起来,这样就可以对业务层的任务进行重试,停止等操作。
RESTAPI实战获取dag列表信息以下请求获取airflow中所有的dag的信息。通过该命令会返回一个dag的列表。
从以上获取到的结果可以看出,该任务执行成功了。
1创建访问用户添加一个user1用户通过以下命令来添加一个user1用户,使得他可以通过RESTAPI来访问airflow。
airflow users create -u user1 -p user1 -r Admin -f firstname -l lastname -e user1@example.org
一种是通过接口来直接停止所有的taskinstance;注意:这种方式在airflow-0才开始支持,提供了一个接口用来停止某个正在运行的dag实例。在最新的airflow-0的文档中,还没有该接口的说明,但在接口说明的yaml文件中可以找到该接口。
通过该命令根据airflow的REST接口来触发某个dag去执行。该命令会返回dag运行时的id。如下:
{
"conf": {},
"dag_id": "xhz_hello_world_dag",
"dag_run_id": "manual__2021-10-09T03:28:59+00:00",
"end_date": null,
"execution_date": "2021-10-09T03:28:59+00:00",
"external_trigger": true,
"start_date": null,
"state": "queued"
}
获取运行任务的信息可以通过触发任务时,获取到的dag_run_id的值,来获取运行时任务的状态信息。
触发某个dag的执行
若没有任何问题,就会得到目前airflow中所有的dag的列表,和每个dag的基本信息。其输出如下:
{
"dags": [
{
"dag_id": "example_bash_operator",
"description": null,
"file_token": ".eJw9xzESgjAQBdC70MMvUnASa-YDK2SM2Z3doOb2Wtm9N-AW4gGXflWoNbBy07ozQeorYD3NKHn9oZ1a0zQjcpPRuD14SIDZ70XfkA-fVmTZecQ_K-Nc1MTZ1CfrwxcF1CnD.riJklZV8hUinhuzuGUd-a-AxMG8",
"fileloc": "/Users/reyun/opt/anaconda3/envs/py37/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py",
"is_active": true,
"is_paused": true,
"is_subdag": false,
"owners": [
"airflow"
],
"root_dag_id": null,
"schedule_interval": {
"__type": "CronExpression",
"value": "0 0 * * *"
},
"tags": [
{
"name": "example"
},
{
"name": "example2"
}
]
},
{
"dag_id": "example_branch_datetime_operator_2",
"description": null,
"file_token": ".eJw9yDsSgzAMRdG90AcVLlhJakbAC3hi
...
},
{
"dag_id": "xhz_hello_world_dag",
"description": "zxh first DAG",
"file_token": "Ii9Vc2Vycy9yZXl1bi9haXJmbG93L2RhZ3MvaGVsbG9fd29ybGQucHki.Mu2a930AhcRoi1WYEO8JE1Dh1r0",
"fileloc": "/Users/reyun/airflow/dags/hello_world.py",
"is_active": true,
"is_paused": true,
"is_subdag": false,
"owners": [
"zxh"
],
"root_dag_id": null,
"schedule_interval": {
"__type": "TimeDelta",
"days": 1,
"microseconds": 0,
"seconds": 0
},
"tags": []
}
],
"total_entries": 33
}
通过RESTAPI接口,有两种方式可以用来停止一个正在运行的dag:
一种是停止某个dag_run_id下的taskinstance;但要注意,这种方式可能无法及时停止所有正在并行运行的taskinstance。对于依赖简单的dag,可以使用这种方式来停止正在运行的dag。
配置并创建用户修改配置文件修改配置文件修改配置文件airflow.cfg,把auth_backend选项的值修改成以下值。
auth_backend = airflow.api.auth.backend.basic_auth
这种方式需要先找到把哪个taskinstance设置成failed状态。所以需要先获取dag_run_id的taskinstance情况,然后找到可以设置状态的taskinstance,然后才能设置状态。
第一种方式
在该命令中:-u是用户名;-p是密码;-f是firstname;-l是lastname;
这种方式会把正在执行的任务设置成failed状态,已经执行成功的任务的状态不会改变。
得到的输出结果如下:
{
"conf": {},
"dag_id": "zxh_hello_world_dag",
"dag_run_id": "manual__2021-10-09T03:28:59+00:00",
"end_date": "2021-10-09T07:29:07.851392+00:00",
"execution_date": "2021-10-09T03:28:59+00:00",
"external_trigger": true,
"start_date": "2021-10-09T07:28:59.887252+00:00",
"state": "success"
}
-r是角色;-e是邮箱。
这些参数最好都填上,否则会创建失败。
文章为作者独立观点,不代表股票配资公司观点