airflow的定时任务

摘要:
代码显示:importosfromtatetimeimportdatetimeimportpytzfromfairflowimportDAGfromfairlow.modelimportVariablefromfairflow.operators.http_operatorimportSimpleHttpOperatorfromfairlow.operator.python _operatorimp

airflow的定时任务第1张

代码展示 : 

import os
from datetime import datetime

import pytz
from airflow import DAG
from airflow.models import Variable
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.hooks.postgres_hook import PostgresHook

# 设置第一次触发任务时间 及 设置任务执行的时区
local_tz = pendulum.timezone("Asia/Shanghai")
start_date = datetime.datetime(2016, 1, 1, tzinfo=local_tz)

# dag默认参数
args = {
    "owner": "Rgc",  # 任务拥有人
    "depends_on_past": False,  # 是否依赖过去执行此任务的结果,如果为True,则过去任务必须成功,才能执行此次任务
    "start_date": start_date,  # 任务开始执行时间
}

# 定义一个DAG
# 参数catchup指 是否填充执行 start_date到现在 未执行的缺少任务;如:start_date定义为2019-10-10,现在是2019-10-29,任务是每天定时执行一次,
# 如果此参数设置为True,则 会生成 10号到29号之间的19点执行此任务;如果设置为False,则不会补充执行任务;
# schedule_interval:定时执行方式,推荐使用如下字符串方式, 方便写出定时规则的网址:https://crontab.guru/
# 也可以使用  :  schedule_interval= datetime.timedelta(minutes=5)  5分钟执行一次
dag = DAG(
    dag_id="HttpSendDag",
    catchup=False,
    default_args=args,
    schedule_interval="0 19 * * *"
)


# 触发任务的时候的某些参数 , ds=today
def func(ds, next_ds, run_id, **kwargs):
    pass


# 调动Python相关函数
t1 = PythonOperator(
    task_id='deviation_new',
    python_callable=func,
    provide_context=True,
    dag=dag
)

注意:当您不想安排DAG时,请使用schedule_interval=None而不是schedule_interval='None'

# 每5分钟执行一次 
schedule_interval="*/5 * * * *"

# 每小时执行一次 
schedule_interval="0 1 * * *"

# 每天8-20点之间,每小时执行一次
schedule_interval="0 8-20 * * *"

免责声明:文章转载自《airflow的定时任务》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇error40;无法打开到SQL Server的连接,设置了Tcp/IP等也不能连接的问题多个iframe的刷新问题下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章