# Copyright 2015 Bloomberg Finance L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A library to work with cron/quartz expressions and timezones.
The library provides a way to define schedules attached to timezones and get
time occurrences out of it by just iterating the object created.
See the Schedule class for further details
The key terms used in the documentations are:
- Schedule: Specification of a successions of occurrences
- Occurrence: point in time that is satisfied by the specification of a schedule
As an example, a schedule is every tuesday at 2pm in London,
an occurrence is next tuesday at 2pm with an offset from utc of +60 minutes.
"""
import datetime as dt
import itertools
import re
import six
import pytz
from dateutil import rrule
__all__ = ["Schedule", "InvalidExpression"]
# * * * * * *
# | | | | | |
# | | | | | .. year (yyyy or * for any)
# | | | | ...... day of week (1 - 7) (1 to 7 are Monday to Sunday)
# | | | ........... month (1 - 12)
# | | ................ day of month (1 - 31)
# | ..................... hour (0 - 23)
# .......................... min (0 - 59)
[docs]class InvalidExpression(Exception):
"""Custom exception when we fail to parse an cron/quartz expression"""
[docs]class Schedule(six.Iterator):
"""Schedule allows to get a list of occurrences given a cron specification and tz
Schedule is a class that relying in dateutil.rrule generates a list of
occurrences given a schedule, timezone and start-end datetime
Once the Schedule is built, it is iterable. Being each element an
occurrence of the schedule
The class provides no support for occurrences falling in DST change times.
It will throw an exception if a schedule falls into a DST change period and advance
the iterator. This allows the application to decide on those situations.
Filters allow to specify a filtering condition for the occurrence
See the year filter as an example. A good use of it is to skip non business days
with a calendar.
"""
def __init__(self, expression, t_zone, start_date=None, end_date=None, filters=None):
"""Creates a schedule definition
:param expression: cron expression defining the schedule
:type expression: str
:param t_zone: timezone we want the schedule to be applied on
:type t_zone: instance of a subclass of tzinfo
:param start_date: inclusive date to start to generate occurrences.
Defaults to now
:type start_date: datetime (with tzinfo)
:param end_date: inclusive date of the last occurrence to generate.
Defaults to never
:type end_date: datetime (with tzinfo)
:param filters: list of extra functions to filter occurrences.
:type filters: list of callable
"""
start_date = start_date or dt.datetime.now(pytz.utc) # starts defaults to now
self.t_zone = t_zone
self.expression = expression
self.start_date = start_date
self.end_date = end_date
if start_date.tzinfo is None or (end_date and end_date.tzinfo is None):
raise TypeError("Start and End dates should have a timezone")
start_t = start_date.astimezone(self.t_zone)
end_t = end_date.astimezone(self.t_zone) if end_date else None
# all datetime objects are in the desired tz. Lets strip out the timezones
start_t = start_t.replace(tzinfo=None)
end_t = end_t.replace(tzinfo=None) if end_t else None
self._rrule = process(expression, start_t, end_t)
self.__rrule_iterator = iter(self._rrule)
self.filters = filters or []
self.filters.append(get_year_filter(self.expression.split(" ")[-1]))
def __str__(self):
return "Cron: {} @{} [{}->{}]".format(self.expression, self.t_zone,
self.start_date, self.end_date)
def __iter__(self):
return self
def __next__(self):
"""
Returns the next occurrence or raises StopIteration
This method adds some extra validation for the returned
iteration that are not natively handled by rrule
"""
while True:
next_it = next(self.__rrule_iterator)
next_it = self.t_zone.localize(next_it, is_dst=None)
if not all([filt(next_it) for filt in self.filters]):
continue
return next_it
# Private helpers
class Parser(object):
"""Abstract class to create parsers for parts of quartz expressions
Each parser can be used per token and a specific parser needs to provide
the valid ranges of the quartz part and a dict of REPLACEMENTS in upper case
See the specific parsers below (Ex: MinuteParser, WeekDayParser, etc..)
All values:
A star can be used to specify all valid values
Multiple options:
Each of the expression parsed can contain a list of expressions as
a comma separated list. duplicates are removed
Example: 0,1,4 Means 0, 1 and 4
Ranges:
A dash can be used to represent ranges
2-5 Means 2 to 3
Step:
A slash can be used to specify a step
Example: */2 Means to pick one of every two values.
if the valid range is 0 to 3 it will return 0 and 2
Replacements:
Each specific parser can define String replacements for the expression.
Ex: JAN is ok for 1 (Jan) [ Case insensitive ]
Other examples:
"1,3-6,8" -> [1, 3, 4, 5, 6, 8].
'1-3, 0-10/2" -> [0, 1, 2, 3, 4, 6, 8, 10]
"""
MIN_VALUE = None # Min value the expression can have
MAX_VALUE = None # Max value inclusive the expression can have
REPLACEMENTS = {} # String replacements for the expression.
QUARTZ_REGEXP = re.compile(r"(?P<start>(\d+)|\*)(-(?P<end>\d+))?(/(?P<step>\d+))?")
@classmethod
def _parse_item(cls, expression):
"""Parses one of the comma separated expressions within the full quartz"""
expression = expression.upper()
for key, value in cls.REPLACEMENTS.items():
expression = expression.replace(key, value)
matches = cls.QUARTZ_REGEXP.match(expression)
if not matches:
raise InvalidExpression("Invalid expression: {}".format(expression))
start = matches.group("start")
end = matches.group("end") or start
step = matches.group("step") or 1
if start == "*":
start = cls.MIN_VALUE
end = cls.MAX_VALUE
values = six.moves.range(int(start), int(end) + 1, int(step))
if not all(cls.MIN_VALUE <= x <= cls.MAX_VALUE for x in values):
raise InvalidExpression("{} produces items out of {}"
.format(expression, cls.__name__))
return values
@classmethod
def parse(cls, expression):
"""Parses the quartz expression
:param expression: expression string encoded to parse
returns: sorted list of unique elements resulting from the expression
"""
groups = [cls._parse_item(item) for item in expression.split(',')]
return sorted(list(set(itertools.chain(*groups))))
class MinuteParser(Parser):
"""Custom parser for minutes"""
MIN_VALUE = 0
MAX_VALUE = 59
class HourParser(Parser):
"""Custom parser for hours"""
MIN_VALUE = 0
MAX_VALUE = 23
class MonthDayParser(Parser):
"""Custom parser for month days"""
MIN_VALUE = 1
MAX_VALUE = 31
class MonthParser(Parser):
"""Custom parser for months"""
MIN_VALUE = 1
MAX_VALUE = 12
REPLACEMENTS = {
"JAN": "1",
"FEB": "2",
"MAR": "3",
"APR": "4",
"MAY": "5",
"JUN": "6",
"JUL": "7",
"AUG": "8",
"SEP": "9",
"OCT": "10",
"NOV": "11",
"DEC": "12"
}
class WeekDayParser(Parser):
"""Custom parser for week days"""
MIN_VALUE = 1
MAX_VALUE = 7
REPLACEMENTS = {
"MON": "1",
"TUE": "2",
"WED": "3",
"THU": "4",
"FRI": "5",
"SAT": "6",
"SUN": "7"
}
def parse_cron(expression):
"""parses a cron expression into a dict"""
try:
minute, hour, monthday, month, weekday, _ = expression.split(' ')
except ValueError:
raise InvalidExpression("Invalid number of items in expression: {}"
.format(expression))
result = dict()
result["bysecond"] = [0]
if minute != "*":
result["byminute"] = MinuteParser.parse(minute)
if hour != "*":
result["byhour"] = HourParser.parse(hour)
if monthday != "*":
result["bymonthday"] = MonthDayParser.parse(monthday)
if month != "*":
result["bymonth"] = MonthParser.parse(month)
if weekday != "*":
# rrule uses 0 to 6 for monday to sunday
result["byweekday"] = [d - 1 for d in WeekDayParser.parse(weekday)]
return result
def process(expression, start_date, end_date=None):
"""Given a cron expression and a start/end date returns an rrule
Works with "naive" datetime objects.
"""
if start_date.tzinfo or (end_date and end_date.tzinfo):
raise TypeError("Timezones are forbidden in this land.")
arguments = parse_cron(expression)
# as rrule will strip out microseconds, we need to do this hack :)
# we could use .after but that changes the iface
# The idea is, as the cron expresion works at minute level, it is fine to
# set the start time one second after the minute. The key is not to generate
# the current minute.
# Ex: if start time is 05:00.500 you should not generate 05:00
if start_date.second == 0 and start_date.microsecond != 0:
start_date = start_date + dt.timedelta(0, 1)
arguments["dtstart"] = start_date
if end_date:
arguments["until"] = end_date
# TODO: This can be optimized to values bigger than minutely
# by checking if the minutes and hours are provided.
# After hours (rrule.DAILY) it gets trickier as we have multiple
# parameters affecting the recurrence (weekday/ month-day)
return rrule.rrule(rrule.MINUTELY, **arguments)
def get_year_filter(year):
"""Creates a filter for a year"""
def year_filter(occurrence):
"""Filter for years
Using the year captured the closure, returns false if the occurrence
is before the year, true when is in the year and stops when is past
"""
if year == "*":
return True
else:
valid_year = int(year)
if occurrence.year < valid_year:
return False
elif occurrence.year > valid_year:
raise StopIteration("Valid time already past")
else:
return True
return year_filter