Python: schedule模块,Python 周期任务用法教程

Python 实用宝典

import schedule

from datetime import datetime, timedelta, time


def job():

print(‘Boo’)


每个小时运行作业,18:30后停止

schedule.every(1).hours.until(“18:30”).do(job)


每个小时运行作业,2030-01-01 18:33 today

schedule.every(1).hours.until(“2030-01-01 18:33”).do(job)


每个小时运行作业,8个小时后停止

schedule.every(1).hours.until(timedelta(hours=8)).do(job)


每个小时运行作业,11:32:42后停止

schedule.every(1).hours.until(time(11, 33, 42)).do(job)


每个小时运行作业,2020-5-17 11:36:20后停止

schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)


截止日期之后,该作业将无法运行。


立即运行所有作业,而不管其安排如何


如果某个机制触发了,你需要立即运行所有作业,可以调用 schedule.run_all() :


Python 实用宝典

import schedule


def job_1():

print(‘Foo’)


def job_2():

print(‘Bar’)


schedule.every().monday.at(“12:40”).do(job_1)

schedule.every().tuesday.at(“16:40”).do(job_2)

schedule.run_all()


立即运行所有作业,每次作业间隔10秒

schedule.run_all(delay_seconds=10

3.高级使用

装饰器安排作业

如果你觉得设定作业这种形式太啰嗦了,也可以使用装饰器模式:


Python 实用宝典

from schedule import every, repeat, run_pending

import time


此装饰器效果等同于 schedule.every(10).minutes.do(job)

@repeat(every(10).minutes)

def job():

print(“I am a scheduled job”)

while True:

run_pending()

time.sleep(1)

并行执行

默认情况下,Schedule 按顺序执行所有作业。其背后的原因是,很难找到让每个人都高兴的并行执行模型。

不过你可以通过多线程的形式来运行每个作业以解决此限制:


Python 实用宝典

import threading

import time

import schedule

def job1():

print(“I’m running on thread %s” % threading.current_thread())

def job2():

print(“I’m running on thread %s” % threading.current_thread())

def job3():

print(“I’m running on thread %s” % threading.current_thread())


def run_threaded(job_func):

job_thread = threading.Thread(target=job_func)

job_thread.start()


schedule.every(10).seconds.do(run_threaded, job1)

schedule.every(10).seconds.do(run_threaded, job2)

schedule.every(10).seconds.do(run_threaded, job3)


while True:

schedule.run_pending()

time.sleep(1)


日志记录


Schedule 模块同时也支持 logging 日志记录,这么使用:


Python 实用宝典

import schedule

import logging


logging.basicConfig()

schedule_logger = logging.getLogger(‘schedule’)


日志级别为DEBUG

schedule_logger.setLevel(level=logging.DEBUG)


def job():

print(“Hello, Logs”)


schedule.every().second.do(job)


schedule.run_all()


schedule.clear()


效果如下:


DEBUG:schedule:Running all 1 jobs with 0s delay in between

DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={})

Hello, Logs

DEBUG:schedule:Deleting all jobs


异常处理


Schedule 不会自动捕捉异常,它遇到异常会直接抛出,这会导致一个严重的问题:后续所有的作业都会被中断执行,因此我们需要捕捉到这些异常。


你可以手动捕捉,但是某些你预料不到的情况需要程序进行自动捕获,加一个装饰器就能做到了:


Python 实用宝典

import functools

def catch_exceptions(cancel_on_failure=False):

def catch_exceptions_decorator(job_func):

@functools.wraps(job_func)

def wrapper(*args, **kwargs):

try:

return job_func(*args, **kwargs)

except:

import traceback

print(traceback.format_exc())

if cancel_on_failure:

return schedule.CancelJob

return wrapper

return catch_exceptions_decorator

@catch_exceptions(cancel_on_failure=True)

def bad_task():

return 1 / 0

schedule.every(5).minutes.do(bad_task)


白俊遥博客
请先登录后发表评论
  • 最新评论
  • 总共0条评论