django-orchestra/orchestra/contrib/orchestration/manager.py

210 lines
8.7 KiB
Python
Raw Permalink Normal View History

import logging
2014-05-08 16:59:35 +00:00
import threading
import traceback
2015-03-23 15:36:51 +00:00
from collections import OrderedDict
2014-05-08 16:59:35 +00:00
from django.core.mail import mail_admins
2014-05-08 16:59:35 +00:00
from orchestra.utils import db
2015-04-20 14:23:10 +00:00
from orchestra.utils.python import import_class, OrderedSet
2014-05-13 13:46:40 +00:00
2015-04-07 15:14:49 +00:00
from . import settings, Operation
2015-04-01 15:49:21 +00:00
from .backends import ServiceBackend
2014-05-08 16:59:35 +00:00
from .helpers import send_report
2015-04-07 15:14:49 +00:00
from .models import BackendLog
2015-05-05 19:42:55 +00:00
from .signals import pre_action, post_action, pre_commit, post_commit, pre_prepare, post_prepare
2014-05-08 16:59:35 +00:00
logger = logging.getLogger(__name__)
2015-03-04 21:06:16 +00:00
router = import_class(settings.ORCHESTRATION_ROUTER)
def keep_log(execute, log, operations):
2014-05-08 16:59:35 +00:00
def wrapper(*args, **kwargs):
""" send report """
2015-05-06 15:32:22 +00:00
# Remember that threads have their oun connection poll
# No need to EVER temper with the transaction here
log = kwargs['log']
2014-10-27 14:31:04 +00:00
try:
log = execute(*args, **kwargs)
except Exception as e:
trace = traceback.format_exc()
log.state = log.EXCEPTION
2016-10-22 07:23:45 +00:00
log.stderr += trace
log.save()
2015-10-05 13:31:08 +00:00
subject = 'EXCEPTION executing backend(s) %s %s' % (args, kwargs)
logger.error(subject)
logger.error(trace)
mail_admins(subject, trace)
# We don't propagate the exception further to avoid transaction rollback
2015-05-06 14:39:25 +00:00
finally:
2015-05-06 15:32:22 +00:00
# Store and log the operation
2015-05-06 14:39:25 +00:00
for operation in operations:
2015-10-05 13:31:08 +00:00
logger.info("Executed %s" % operation)
2015-05-13 12:16:51 +00:00
operation.store(log)
if not log.is_success:
send_report(execute, args, log)
2015-05-06 14:39:25 +00:00
stdout = log.stdout.strip()
2016-06-17 10:00:04 +00:00
stdout and logger.debug('STDOUT %s', stdout.encode('ascii', errors='replace').decode())
2015-05-06 14:39:25 +00:00
stderr = log.stderr.strip()
2016-06-17 10:00:04 +00:00
stderr and logger.debug('STDERR %s', stderr.encode('ascii', errors='replace').decode())
return wrapper
2015-04-01 15:49:21 +00:00
def generate(operations):
2015-03-23 15:36:51 +00:00
scripts = OrderedDict()
2014-07-17 16:09:24 +00:00
cache = {}
serialize = False
2015-05-06 15:32:22 +00:00
# Generate scripts per route+backend
2014-05-08 16:59:35 +00:00
for operation in operations:
2015-10-05 13:31:08 +00:00
logger.debug("Queued %s" % operation)
2015-05-06 15:32:22 +00:00
if operation.routes is None:
operation.routes = router.objects.get_for_operation(operation, cache=cache)
2015-05-06 15:32:22 +00:00
for route in operation.routes:
# TODO key by action.async
async_action = route.action_is_async(operation.action)
key = (route, operation.backend, async_action)
2014-05-08 16:59:35 +00:00
if key not in scripts:
2015-05-05 19:42:55 +00:00
backend, operations = (operation.backend(), [operation])
scripts[key] = (backend, operations)
backend.set_head()
pre_prepare.send(sender=backend.__class__, backend=backend)
backend.prepare()
post_prepare.send(sender=backend.__class__, backend=backend)
2014-05-08 16:59:35 +00:00
else:
scripts[key][1].append(operation)
2014-10-04 09:29:18 +00:00
# Get and call backend action method
2015-03-12 14:05:23 +00:00
backend = scripts[key][0]
method = getattr(backend, operation.action)
kwargs = {
'sender': backend.__class__,
'backend': backend,
'instance': operation.instance,
'action': operation.action,
}
2015-05-05 19:42:55 +00:00
backend.set_content()
2015-03-12 14:05:23 +00:00
pre_action.send(**kwargs)
2014-05-08 16:59:35 +00:00
method(operation.instance)
2015-03-12 14:05:23 +00:00
post_action.send(**kwargs)
if backend.serialize:
serialize = True
2015-04-02 16:14:55 +00:00
for value in scripts.values():
2015-04-01 15:49:21 +00:00
backend, operations = value
2015-05-05 19:42:55 +00:00
backend.set_tail()
pre_commit.send(sender=backend.__class__, backend=backend)
2015-04-01 15:49:21 +00:00
backend.commit()
2015-05-05 19:42:55 +00:00
post_commit.send(sender=backend.__class__, backend=backend)
return scripts, serialize
2015-04-01 15:49:21 +00:00
def execute(scripts, serialize=False, run_async=None):
2015-05-07 19:00:02 +00:00
"""
executes the operations on the servers
2015-05-07 19:00:02 +00:00
serialize: execute one backend at a time
run_async: do not join threads (overrides route.run_async)
2015-05-07 19:00:02 +00:00
"""
2015-04-07 15:14:49 +00:00
if settings.ORCHESTRATION_DISABLE_EXECUTION:
2015-05-09 15:37:35 +00:00
logger.info('Orchestration execution is dissabled by ORCHESTRATION_DISABLE_EXECUTION.')
2015-04-07 15:14:49 +00:00
return []
2014-05-08 16:59:35 +00:00
# Execute scripts on each server
executions = []
2015-05-06 14:39:25 +00:00
threads_to_join = []
logs = []
2015-04-02 16:14:55 +00:00
for key, value in scripts.items():
route, __, async_action = key
2014-05-08 16:59:35 +00:00
backend, operations = value
2015-05-06 15:32:22 +00:00
args = (route.host,)
if run_async is None:
is_async = not serialize and (route.run_async or async_action)
2015-05-07 19:00:02 +00:00
else:
is_async = not serialize and (run_async or async_action)
2015-05-06 14:39:25 +00:00
kwargs = {
'run_async': is_async,
2015-05-06 14:39:25 +00:00
}
2015-05-09 15:37:35 +00:00
# we clone the connection just in case we are isolated inside a transaction
with db.clone(model=BackendLog) as handle:
log = backend.create_log(*args, using=handle.target)
log._state.db = handle.origin
2015-05-06 14:39:25 +00:00
kwargs['log'] = log
task = keep_log(backend.execute, log, operations)
logger.debug('%s is going to be executed on %s.' % (backend, route.host))
if serialize:
2015-04-02 16:14:55 +00:00
# Execute one backend at a time, no need for threads
2015-05-06 14:39:25 +00:00
task(*args, **kwargs)
else:
task = db.close_connection(task)
2015-05-06 14:39:25 +00:00
thread = threading.Thread(target=task, args=args, kwargs=kwargs)
thread.start()
if not is_async:
2015-05-06 14:39:25 +00:00
threads_to_join.append(thread)
logs.append(log)
[ thread.join() for thread in threads_to_join ]
2014-05-08 16:59:35 +00:00
return logs
2015-04-01 15:49:21 +00:00
def collect(instance, action, **kwargs):
""" collect operations """
2015-05-13 12:16:51 +00:00
operations = kwargs.get('operations', OrderedSet())
route_cache = kwargs.get('route_cache', {})
2015-04-14 15:22:01 +00:00
for backend_cls in ServiceBackend.get_backends():
# Check if there exists a related instance to be executed for this backend and action
2015-04-01 15:49:21 +00:00
instances = []
if action in backend_cls.actions:
if backend_cls.is_main(instance):
instances = [(instance, action)]
else:
2016-03-11 12:19:34 +00:00
for candidate in backend_cls.get_related(instance):
if candidate.__class__.__name__ == 'ManyRelatedManager':
if 'pk_set' in kwargs:
# m2m_changed signal
candidates = kwargs['model'].objects.filter(pk__in=kwargs['pk_set'])
else:
candidates = candidate.all()
2015-04-01 15:49:21 +00:00
else:
candidates = [candidate]
for candidate in candidates:
# Check if a delete for candidate is in operations
delete_mock = Operation(backend_cls, candidate, Operation.DELETE)
if delete_mock not in operations:
# related objects with backend.model trigger save()
instances.append((candidate, Operation.SAVE))
2015-04-01 15:49:21 +00:00
for selected, iaction in instances:
# Maintain consistent state of operations based on save/delete behaviour
# Prevent creating a deleted selected by deleting existing saves
if iaction == Operation.DELETE:
2015-04-07 15:14:49 +00:00
save_mock = Operation(backend_cls, selected, Operation.SAVE)
2015-04-01 15:49:21 +00:00
try:
operations.remove(save_mock)
except KeyError:
pass
else:
update_fields = kwargs.get('update_fields', None)
if update_fields is not None:
2015-04-05 18:02:36 +00:00
# TODO remove this, django does not execute post_save if update_fields=[]...
# Maybe open a ticket at Djangoproject ?
2016-02-11 14:24:09 +00:00
# INITIAL INTENTION: "update_fields=[]" is a convention for explicitly executing backend
2015-04-01 15:49:21 +00:00
# i.e. account.disable()
if update_fields != []:
execute = False
for field in update_fields:
if field not in backend_cls.ignore_fields:
execute = True
break
if not execute:
continue
2015-04-07 15:14:49 +00:00
operation = Operation(backend_cls, selected, iaction)
2015-05-06 15:32:22 +00:00
# Only schedule operations if the router has execution routes
routes = router.objects.get_for_operation(operation, cache=route_cache)
2015-05-06 15:32:22 +00:00
if routes:
2016-06-17 10:00:04 +00:00
logger.debug("Operation %s collected for execution" % operation)
2015-05-06 15:32:22 +00:00
operation.routes = routes
2015-04-01 15:49:21 +00:00
if iaction != Operation.DELETE:
# usually we expect to be using last object state,
# except when we are deleting it
operations.discard(operation)
elif iaction == Operation.DELETE:
operation.preload_context()
operations.add(operation)
return operations