227 lines
8.3 KiB
Python
227 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# High performance alternative to beat management command
|
|
# Looks for pending work before firing up all the Django machinery on separate processes
|
|
#
|
|
# Handles orchestra.contrib.tasks periodic_tasks and orchestra.contrib.mailer queued mails
|
|
#
|
|
# USAGE: beat /path/to/project/manage.py
|
|
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
|
|
from orchestra.utils.sys import run, join, LockFile
|
|
|
|
|
|
class crontab_parser(object):
|
|
"""
|
|
from celery.schedules import crontab_parser
|
|
Too expensive to import celery
|
|
"""
|
|
ParseException = ValueError
|
|
|
|
_range = r'(\w+?)-(\w+)'
|
|
_steps = r'/(\w+)?'
|
|
_star = r'\*'
|
|
|
|
def __init__(self, max_=60, min_=0):
|
|
self.max_ = max_
|
|
self.min_ = min_
|
|
self.pats = (
|
|
(re.compile(self._range + self._steps), self._range_steps),
|
|
(re.compile(self._range), self._expand_range),
|
|
(re.compile(self._star + self._steps), self._star_steps),
|
|
(re.compile('^' + self._star + '$'), self._expand_star),
|
|
)
|
|
|
|
def parse(self, spec):
|
|
acc = set()
|
|
for part in spec.split(','):
|
|
if not part:
|
|
raise self.ParseException('empty part')
|
|
acc |= set(self._parse_part(part))
|
|
return acc
|
|
|
|
def _parse_part(self, part):
|
|
for regex, handler in self.pats:
|
|
m = regex.match(part)
|
|
if m:
|
|
return handler(m.groups())
|
|
return self._expand_range((part, ))
|
|
|
|
def _expand_range(self, toks):
|
|
fr = self._expand_number(toks[0])
|
|
if len(toks) > 1:
|
|
to = self._expand_number(toks[1])
|
|
if to < fr: # Wrap around max_ if necessary
|
|
return (list(range(fr, self.min_ + self.max_)) +
|
|
list(range(self.min_, to + 1)))
|
|
return list(range(fr, to + 1))
|
|
return [fr]
|
|
|
|
def _range_steps(self, toks):
|
|
if len(toks) != 3 or not toks[2]:
|
|
raise self.ParseException('empty filter')
|
|
return self._expand_range(toks[:2])[::int(toks[2])]
|
|
|
|
def _star_steps(self, toks):
|
|
if not toks or not toks[0]:
|
|
raise self.ParseException('empty filter')
|
|
return self._expand_star()[::int(toks[0])]
|
|
def _expand_star(self, *args):
|
|
return list(range(self.min_, self.max_ + self.min_))
|
|
|
|
def _expand_number(self, s):
|
|
if isinstance(s, str) and s[0] == '-':
|
|
raise self.ParseException('negative numbers not supported')
|
|
try:
|
|
i = int(s)
|
|
except ValueError:
|
|
try:
|
|
i = weekday(s)
|
|
except KeyError:
|
|
raise ValueError('Invalid weekday literal {0!r}.'.format(s))
|
|
max_val = self.min_ + self.max_ - 1
|
|
if i > max_val:
|
|
raise ValueError(
|
|
'Invalid end range: {0} > {1}.'.format(i, max_val))
|
|
if i < self.min_:
|
|
raise ValueError(
|
|
'Invalid beginning range: {0} < {1}.'.format(i, self.min_))
|
|
return i
|
|
|
|
|
|
class Setting(object):
|
|
def __init__(self, manage):
|
|
self.manage = manage
|
|
self.settings_file = self.get_settings_file(manage)
|
|
|
|
def get_settings(self):
|
|
""" get db settings from settings.py file without importing """
|
|
settings = {'__file__': self.settings_file}
|
|
with open(self.settings_file) as f:
|
|
content = ''
|
|
for line in f.readlines():
|
|
# This is very costly, skip
|
|
if not line.startswith(('import djcelery', 'djcelery.setup_loader()')):
|
|
content += line
|
|
exec(content, settings)
|
|
return settings
|
|
|
|
def get_settings_file(self, manage):
|
|
with open(manage, 'r') as handler:
|
|
regex = re.compile(r'"DJANGO_SETTINGS_MODULE"\s*,\s*"([^"]+)"')
|
|
for line in handler.readlines():
|
|
match = regex.search(line)
|
|
if match:
|
|
settings_module = match.groups()[0]
|
|
settings_file = os.path.join(*settings_module.split('.')) + '.py'
|
|
settings_file = os.path.join(os.path.dirname(manage), settings_file)
|
|
return settings_file
|
|
raise ValueError("settings module not found in %s" % manage)
|
|
|
|
|
|
class DB(object):
|
|
def __init__(self, settings):
|
|
self.settings = settings['DATABASES']['default']
|
|
|
|
def connect(self):
|
|
if self.settings['ENGINE'] == 'django.db.backends.sqlite3':
|
|
import sqlite3
|
|
self.conn = sqlite3.connect(self.settings['NAME'])
|
|
elif self.settings['ENGINE'] == 'django.db.backends.postgresql_psycopg2':
|
|
import psycopg2
|
|
self.conn = psycopg2.connect("dbname='{NAME}' user='{USER}' host='{HOST}' password='{PASSWORD}'".format(**self.settings))
|
|
else:
|
|
raise ValueError("%s engine not supported." % self.settings['ENGINE'])
|
|
|
|
def query(self, query):
|
|
cur = self.conn.cursor()
|
|
try:
|
|
cur.execute(query)
|
|
result = cur.fetchall()
|
|
finally:
|
|
cur.close()
|
|
return result
|
|
|
|
def close(self):
|
|
self.conn.close()
|
|
|
|
|
|
def fire_pending_tasks(manage, db):
|
|
def get_tasks(db):
|
|
enabled = 1 if 'sqlite' in db.settings['ENGINE'] else True
|
|
query = (
|
|
"SELECT c.minute, c.hour, c.day_of_week, c.day_of_month, c.month_of_year, p.id "
|
|
"FROM djcelery_periodictask as p, djcelery_crontabschedule as c "
|
|
"WHERE p.crontab_id = c.id AND p.enabled = {}"
|
|
).format(enabled)
|
|
return db.query(query)
|
|
|
|
def is_due(now, minute, hour, day_of_week, day_of_month, month_of_year):
|
|
n_minute, n_hour, n_day_of_week, n_day_of_month, n_month_of_year = now
|
|
return (
|
|
n_minute in crontab_parser(60).parse(minute) and
|
|
n_hour in crontab_parser(24).parse(hour) and
|
|
n_day_of_week in crontab_parser(7).parse(day_of_week) and
|
|
n_day_of_month in crontab_parser(31, 1).parse(day_of_month) and
|
|
n_month_of_year in crontab_parser(12, 1).parse(month_of_year)
|
|
)
|
|
|
|
now = datetime.utcnow()
|
|
now = tuple(map(int, now.strftime("%M %H %w %d %m").split()))
|
|
for minute, hour, day_of_week, day_of_month, month_of_year, task_id in get_tasks(db):
|
|
if is_due(now, minute, hour, day_of_week, day_of_month, month_of_year):
|
|
command = 'python3 -W ignore::DeprecationWarning {manage} runtask {task_id}'.format(
|
|
manage=manage, task_id=task_id)
|
|
proc = run(command, run_async=True)
|
|
yield proc
|
|
|
|
|
|
def fire_pending_messages(settings, db):
|
|
def has_pending_messages(settings, db):
|
|
MAILER_DEFERE_SECONDS = settings.get('MAILER_DEFERE_SECONDS', (300, 600, 60*60, 60*60*24))
|
|
now = datetime.utcnow()
|
|
query_or = []
|
|
|
|
for num, seconds in enumerate(MAILER_DEFERE_SECONDS):
|
|
delta = timedelta(seconds=seconds)
|
|
epoch = now-delta
|
|
query_or.append("""(mailer_message.retries = %i AND mailer_message.last_try <= '%s')"""
|
|
% (num, epoch.isoformat().replace('T', ' ')))
|
|
query = """\
|
|
SELECT 1 FROM mailer_message
|
|
WHERE (mailer_message.state = 'QUEUED'
|
|
OR (mailer_message.state = 'DEFERRED' AND (%s))) LIMIT 1""" % ' OR '.join(query_or)
|
|
return bool(db.query(query))
|
|
|
|
if has_pending_messages(settings, db):
|
|
command = 'python3 -W ignore::DeprecationWarning {manage} sendpendingmessages'.format(manage=manage)
|
|
proc = run(command, run_async=True)
|
|
yield proc
|
|
|
|
|
|
if __name__ == "__main__":
|
|
with LockFile('/dev/shm/beat.lock', expire=20):
|
|
manage = sys.argv[1]
|
|
procs = []
|
|
settings = Setting(manage).get_settings()
|
|
db = DB(settings)
|
|
db.connect()
|
|
try:
|
|
# Non-blocking loop, we need to finish this in time for the next minute.
|
|
if 'orchestra.contrib.tasks' in settings['INSTALLED_APPS']:
|
|
if settings.get('TASKS_BACKEND', 'thread') in ('thread', 'process'):
|
|
for proc in fire_pending_tasks(manage, db):
|
|
procs.append(proc)
|
|
if 'orchestra.contrib.mailer' in settings['INSTALLED_APPS']:
|
|
for proc in fire_pending_messages(settings, db):
|
|
procs.append(proc)
|
|
finally:
|
|
db.close()
|
|
sys.exit(0)
|