From 5344e732fc5183d4c5ea5295fab5aee3e5251e58 Mon Sep 17 00:00:00 2001 From: Marc Date: Fri, 11 Jul 2014 21:09:17 +0000 Subject: [PATCH] Fixes on resource monitoring --- TODO.md | 5 +- orchestra/apps/orchestration/manager.py | 6 +- orchestra/apps/resources/admin.py | 7 +- orchestra/apps/resources/backends.py | 1 + orchestra/apps/resources/models.py | 39 +++- orchestra/apps/resources/tasks.py | 20 +- orchestra/apps/websites/backends/apache.py | 15 +- orchestra/bin/celerybeat | 119 ++++++++-- orchestra/bin/celeryd | 243 +++++++++++++++++---- orchestra/models/utils.py | 22 ++ 10 files changed, 373 insertions(+), 104 deletions(-) diff --git a/TODO.md b/TODO.md index bc5165ed..075fe2b7 100644 --- a/TODO.md +++ b/TODO.md @@ -51,6 +51,9 @@ Remember that, as always with QuerySets, any subsequent chained methods which im * create custom field that returns backend python objects * Timezone awareness on monitoring system (reading server-side logs with different TZ than orchestra) maybe a settings value? (use UTC internally, timezone.localtime() when interacting with servers) -* Resource metric: KB MB B? +* Resource metric: KB MB B? RESOURCE UNIT!! forms and serializers * EMAIL backend operations which contain stderr messages (because under certain failures status code is still 0) + + +* Settings dictionary like DRF2 in order to better override large settings like WEBSITES_APPLICATIONS.etc diff --git a/orchestra/apps/orchestration/manager.py b/orchestra/apps/orchestration/manager.py index 1b9254ff..fdd22e04 100644 --- a/orchestra/apps/orchestration/manager.py +++ b/orchestra/apps/orchestration/manager.py @@ -10,9 +10,9 @@ from .helpers import send_report def as_task(execute): def wrapper(*args, **kwargs): -# with db.transaction.commit_manually(): - log = execute(*args, **kwargs) -# db.transaction.commit() + with db.transaction.commit_manually(): + log = execute(*args, **kwargs) + db.transaction.commit() if log.state != log.SUCCESS: send_report(execute, args, log) return log diff --git a/orchestra/apps/resources/admin.py b/orchestra/apps/resources/admin.py index 7f6ba1fe..53a45cac 100644 --- a/orchestra/apps/resources/admin.py +++ b/orchestra/apps/resources/admin.py @@ -63,13 +63,16 @@ class ResourceAdmin(ExtendedModelAdmin): class ResourceDataAdmin(admin.ModelAdmin): - list_display = ('id', 'resource', 'used', 'allocated', 'last_update',) # TODO content_object + list_display = ('id', 'resource', 'used', 'allocated', 'last_update', 'content_type') # TODO content_object list_filter = ('resource',) class MonitorDataAdmin(admin.ModelAdmin): - list_display = ('id', 'monitor', 'date', 'value') # TODO content_object + list_display = ('id', 'monitor', 'date', 'value', 'ct', 'object_id') # TODO content_object list_filter = ('monitor',) + + def ct(self, i): + return i.content_type_id admin.site.register(Resource, ResourceAdmin) diff --git a/orchestra/apps/resources/backends.py b/orchestra/apps/resources/backends.py index 1f7e7d8d..6339f316 100644 --- a/orchestra/apps/resources/backends.py +++ b/orchestra/apps/resources/backends.py @@ -12,6 +12,7 @@ class ServiceMonitor(ServiceBackend): DISK = 'disk' MEMORY = 'memory' CPU = 'cpu' + # TODO UNITS actions = ('monitor', 'resource_exceeded', 'resource_recovery') diff --git a/orchestra/apps/resources/models.py b/orchestra/apps/resources/models.py index 4d8c0b98..73609738 100644 --- a/orchestra/apps/resources/models.py +++ b/orchestra/apps/resources/models.py @@ -1,13 +1,16 @@ import datetime from django.db import models +from django.db.models.loading import get_model from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation from django.contrib.contenttypes.models import ContentType from django.core import validators +from django.utils import timezone from django.utils.translation import ugettext_lazy as _ from djcelery.models import PeriodicTask, CrontabSchedule from orchestra.models.fields import MultiSelectField +from orchestra.models.utils import get_model_field_path from orchestra.utils.apps import autodiscover from .backends import ServiceMonitor @@ -119,11 +122,13 @@ class ResourceData(models.Model): @classmethod def get_or_create(cls, obj, resource): + ct = ContentType.objects.get_for_model(type(obj)) try: - return cls.objects.get(content_object=obj, resource=resource) - except cls.DoesNotExists: + return cls.objects.get(content_type=ct, object_id=obj.pk, + resource=resource) + except cls.DoesNotExist: return cls.objects.create(content_object=obj, resource=resource, - allocated=resource.defalt_allocation) + allocated=resource.default_allocation) def get_used(self): resource = self.resource @@ -131,8 +136,19 @@ class ResourceData(models.Model): result = 0 has_result = False for monitor in resource.monitors: - dataset = MonitorData.objects.filter(monitor=monitor, - content_type=self.content_type, object_id=self.object_id) + resource_model = self.content_type.model_class() + monitor_model = get_model(ServiceMonitor.get_backend(monitor).model) + if resource_model == monitor_model: + dataset = MonitorData.objects.filter(monitor=monitor, + content_type=self.content_type_id, object_id=self.object_id) + else: + path = get_model_field_path(monitor_model, resource_model) + fields = '__'.join(path) + objects = monitor_model.objects.filter(**{fields: self.object_id}) + pks = objects.values_list('id', flat=True) + ct = ContentType.objects.get_for_model(monitor_model) + dataset = MonitorData.objects.filter(monitor=monitor, + content_type=ct, object_id__in=pks) if resource.period == resource.MONTHLY_AVG: try: last = dataset.latest() @@ -143,17 +159,18 @@ class ResourceData(models.Model): tzinfo=timezone.utc) total = (epoch-last.date).total_seconds() dataset = dataset.filter(date__year=today.year, - date__month=today.month) + date__month=today.month) for data in dataset: slot = (previous-data.date).total_seconds() result += data.value * slot/total elif resource.period == resource.MONTHLY_SUM: - data = dataset.filter(date__year=today.year, - date__month=today.month) - value = data.aggregate(models.Sum('value'))['value__sum'] - if value: + data = dataset.filter(date__year=today.year, date__month=today.month) + # FIXME Aggregation of 0s returns None! django bug? + # value = data.aggregate(models.Sum('value'))['value__sum'] + values = data.values_list('value', flat=True) + if values: has_result = True - result += value + result += sum(values) elif resource.period == resource.LAST: try: result += dataset.latest().value diff --git a/orchestra/apps/resources/tasks.py b/orchestra/apps/resources/tasks.py index 0f834994..26f0553f 100644 --- a/orchestra/apps/resources/tasks.py +++ b/orchestra/apps/resources/tasks.py @@ -1,9 +1,11 @@ from celery import shared_task +from django.db.models.loading import get_model from django.utils import timezone + from orchestra.apps.orchestration.models import BackendOperation as Operation from .backends import ServiceMonitor -from .models import MonitorData +from .models import ResourceData, Resource @shared_task(name='resources.Monitor') @@ -13,7 +15,7 @@ def monitor(resource_id): # Execute monitors for monitor_name in resource.monitors: backend = ServiceMonitor.get_backend(monitor_name) - model = backend.model + model = get_model(*backend.model.split('.')) operations = [] # Execute monitor for obj in model.objects.all(): @@ -22,18 +24,18 @@ def monitor(resource_id): # Update used resources and trigger resource exceeded and revovery operations = [] - model = resource.model + model = resource.content_type.model_class() for obj in model.objects.all(): - data = MonitorData.get_or_create(obj, resource) + data = ResourceData.get_or_create(obj, resource) current = data.get_used() if not resource.disable_trigger: if data.used < data.allocated and current > data.allocated: - op = Operation.create(backend, data.content_object, Operation.EXCEED) + op = Operation.create(backend, obj, Operation.EXCEED) operations.append(op) - elif res.used > res.allocated and current < res.allocated: - op = Operation.create(backend, data.content_object, Operation.RECOVERY) + elif data.used > data.allocated and current < data.allocated: + op = Operation.create(backend, obj, Operation.RECOVERY) operation.append(op) - data.used = current - data.las_update = timezone.now() + data.used = current or 0 + data.last_update = timezone.now() data.save() Operation.execute(operations) diff --git a/orchestra/apps/websites/backends/apache.py b/orchestra/apps/websites/backends/apache.py index 2006b4c4..5c751eda 100644 --- a/orchestra/apps/websites/backends/apache.py +++ b/orchestra/apps/websites/backends/apache.py @@ -184,10 +184,11 @@ class Apache2Traffic(ServiceMonitor): def monitor(self, site): context = self.get_context(site) - self.append(""" + self.append("""{ awk 'BEGIN { ini = "%(last_date)s" end = "%(current_date)s" + sum = 0 months["Jan"] = "01"; months["Feb"] = "02"; @@ -212,16 +213,10 @@ class Apache2Traffic(ServiceMonitor): second = substr(date, 19, 2) line_date = year month day hour minute second if ( line_date > ini && line_date < end) - if ( $10 == "" ) - sum += $9 - else - sum += $10 + sum += $NF } END { - if ( sum ) - print sum - else - print 0 - }' %(log_file)s | xargs echo %(object_id)s """ % context) + print sum + }' %(log_file)s || echo 0; } | xargs echo %(object_id)s """ % context) def get_context(self, site): return { diff --git a/orchestra/bin/celerybeat b/orchestra/bin/celerybeat index e43c300b..00e8b35b 100755 --- a/orchestra/bin/celerybeat +++ b/orchestra/bin/celerybeat @@ -10,8 +10,8 @@ ### BEGIN INIT INFO # Provides: celerybeat -# Required-Start: $network $local_fs $remote_fs postgresql celeryd -# Required-Stop: $network $local_fs $remote_fs postgresql celeryd +# Required-Start: $network $local_fs $remote_fs +# Required-Stop: $network $local_fs $remote_fs # Default-Start: 2 3 4 5 # Default-Stop: 0 1 6 # Short-Description: celery periodic task scheduler @@ -20,25 +20,104 @@ # Cannot use set -e/bash -e since the kill -0 command will abort # abnormally in the absence of a valid process ID. #set -e +VERSION=10.0 +echo "celery init v${VERSION}." -DEFAULT_PID_FILE="/var/run/celery/beat.pid" -DEFAULT_LOG_FILE="/var/log/celery/beat.log" -DEFAULT_LOG_LEVEL="INFO" -DEFAULT_CELERYBEAT="celerybeat" +if [ $(id -u) -ne 0 ]; then + echo "Error: This program can only be used by the root user." + echo " Unpriviliged users must use 'celery beat --detach'" + exit 1 +fi + + +# May be a runlevel symlink (e.g. S02celeryd) +if [ -L "$0" ]; then + SCRIPT_FILE=$(readlink "$0") +else + SCRIPT_FILE="$0" +fi +SCRIPT_NAME="$(basename "$SCRIPT_FILE")" # /etc/init.d/celerybeat: start and stop the celery periodic task scheduler daemon. +# Make sure executable configuration script is owned by root +_config_sanity() { + local path="$1" + local owner=$(ls -ld "$path" | awk '{print $3}') + local iwgrp=$(ls -ld "$path" | cut -b 6) + local iwoth=$(ls -ld "$path" | cut -b 9) + + if [ "$(id -u $owner)" != "0" ]; then + echo "Error: Config script '$path' must be owned by root!" + echo + echo "Resolution:" + echo "Review the file carefully and make sure it has not been " + echo "modified with mailicious intent. When sure the " + echo "script is safe to execute with superuser privileges " + echo "you can change ownership of the script:" + echo " $ sudo chown root '$path'" + exit 1 + fi + + if [ "$iwoth" != "-" ]; then # S_IWOTH + echo "Error: Config script '$path' cannot be writable by others!" + echo + echo "Resolution:" + echo "Review the file carefully and make sure it has not been " + echo "modified with malicious intent. When sure the " + echo "script is safe to execute with superuser privileges " + echo "you can change the scripts permissions:" + echo " $ sudo chmod 640 '$path'" + exit 1 + fi + if [ "$iwgrp" != "-" ]; then # S_IWGRP + echo "Error: Config script '$path' cannot be writable by group!" + echo + echo "Resolution:" + echo "Review the file carefully and make sure it has not been " + echo "modified with malicious intent. When sure the " + echo "script is safe to execute with superuser privileges " + echo "you can change the scripts permissions:" + echo " $ sudo chmod 640 '$path'" + exit 1 + fi +} + +scripts="" + if test -f /etc/default/celeryd; then + scripts="/etc/default/celeryd" + _config_sanity /etc/default/celeryd . /etc/default/celeryd fi -if test -f /etc/default/celerybeat; then - . /etc/default/celerybeat +EXTRA_CONFIG="/etc/default/${SCRIPT_NAME}" +if test -f "$EXTRA_CONFIG"; then + scripts="$scripts, $EXTRA_CONFIG" + _config_sanity "$EXTRA_CONFIG" + . "$EXTRA_CONFIG" fi +echo "Using configuration: $scripts" + +CELERY_BIN=${CELERY_BIN:-"celery"} +DEFAULT_USER="celery" +DEFAULT_PID_FILE="/var/run/celery/beat.pid" +DEFAULT_LOG_FILE="/var/log/celery/beat.log" +DEFAULT_LOG_LEVEL="INFO" +DEFAULT_CELERYBEAT="$CELERY_BIN beat" + CELERYBEAT=${CELERYBEAT:-$DEFAULT_CELERYBEAT} CELERYBEAT_LOG_LEVEL=${CELERYBEAT_LOG_LEVEL:-${CELERYBEAT_LOGLEVEL:-$DEFAULT_LOG_LEVEL}} +# Sets --app argument for CELERY_BIN +CELERY_APP_ARG="" +if [ ! -z "$CELERY_APP" ]; then + CELERY_APP_ARG="--app=$CELERY_APP" +fi + +CELERYBEAT_USER=${CELERYBEAT_USER:-${CELERYD_USER:-$DEFAULT_USER}} + # Set CELERY_CREATE_DIRS to always create log/pid dirs. CELERY_CREATE_DIRS=${CELERY_CREATE_DIRS:-0} CELERY_CREATE_RUNDIR=$CELERY_CREATE_DIRS @@ -64,16 +143,10 @@ CELERYBEAT_LOG_DIR=`dirname $CELERYBEAT_LOG_FILE` CELERYBEAT_PID_DIR=`dirname $CELERYBEAT_PID_FILE` # Extra start-stop-daemon options, like user/group. -if [ -n "$CELERYBEAT_USER" ]; then - DAEMON_OPTS="$DAEMON_OPTS --uid $CELERYBEAT_USER" -fi -if [ -n "$CELERYBEAT_GROUP" ]; then - DAEMON_OPTS="$DAEMON_OPTS --gid $CELERYBEAT_GROUP" -fi CELERYBEAT_CHDIR=${CELERYBEAT_CHDIR:-$CELERYD_CHDIR} if [ -n "$CELERYBEAT_CHDIR" ]; then - DAEMON_OPTS="$DAEMON_OPTS --workdir $CELERYBEAT_CHDIR" + DAEMON_OPTS="$DAEMON_OPTS --workdir=$CELERYBEAT_CHDIR" fi @@ -155,7 +228,7 @@ wait_pid () { stop_beat () { - echo -n "Stopping celerybeat... " + echo -n "Stopping ${SCRIPT_NAME}... " if [ -f "$CELERYBEAT_PID_FILE" ]; then wait_pid $(cat "$CELERYBEAT_PID_FILE") else @@ -163,12 +236,13 @@ stop_beat () { fi } +_chuid () { + su "$CELERYBEAT_USER" -c "$CELERYBEAT $*" +} + start_beat () { - echo "Starting celerybeat..." - if [ -n "$VIRTUALENV" ]; then - source $VIRTUALENV/bin/activate - fi - $CELERYBEAT $CELERYBEAT_OPTS $DAEMON_OPTS --detach \ + echo "Starting ${SCRIPT_NAME}..." + _chuid $CELERY_APP_ARG $CELERYBEAT_OPTS $DAEMON_OPTS --detach \ --pidfile="$CELERYBEAT_PID_FILE" } @@ -203,10 +277,9 @@ case "$1" in check_paths ;; *) - echo "Usage: /etc/init.d/celerybeat {start|stop|restart|create-paths}" + echo "Usage: /etc/init.d/${SCRIPT_NAME} {start|stop|restart|create-paths}" exit 64 # EX_USAGE ;; esac exit 0 - diff --git a/orchestra/bin/celeryd b/orchestra/bin/celeryd index 0d34dce6..df918bca 100755 --- a/orchestra/bin/celeryd +++ b/orchestra/bin/celeryd @@ -11,25 +11,106 @@ ### BEGIN INIT INFO # Provides: celeryd -# Required-Start: $network $local_fs $remote_fs postgresql celeryev rabbitmq-server -# Required-Stop: $network $local_fs $remote_fs postgresql celeryev rabbitmq-server +# Required-Start: $network $local_fs $remote_fs +# Required-Stop: $network $local_fs $remote_fs # Default-Start: 2 3 4 5 # Default-Stop: 0 1 6 # Short-Description: celery task worker daemon ### END INIT INFO +# +# +# To implement separate init scripts, copy this script and give it a different +# name: +# I.e., if my new application, "little-worker" needs an init, I +# should just use: +# +# cp /etc/init.d/celeryd /etc/init.d/little-worker +# +# You can then configure this by manipulating /etc/default/little-worker. +# +VERSION=10.0 +echo "celery init v${VERSION}." +if [ $(id -u) -ne 0 ]; then + echo "Error: This program can only be used by the root user." + echo " Unprivileged users must use the 'celery multi' utility, " + echo " or 'celery worker --detach'." + exit 1 +fi -# some commands work asyncronously, so we'll wait this many seconds -SLEEP_SECONDS=5 +# Can be a runlevel symlink (e.g. S02celeryd) +if [ -L "$0" ]; then + SCRIPT_FILE=$(readlink "$0") +else + SCRIPT_FILE="$0" +fi +SCRIPT_NAME="$(basename "$SCRIPT_FILE")" + +DEFAULT_USER="celery" DEFAULT_PID_FILE="/var/run/celery/%n.pid" -DEFAULT_LOG_FILE="/var/log/celery/%n.log" +DEFAULT_LOG_FILE="/var/log/celery/%n%I.log" DEFAULT_LOG_LEVEL="INFO" DEFAULT_NODES="celery" -DEFAULT_CELERYD="-m celery.bin.celeryd_detach" +DEFAULT_CELERYD="-m celery worker --detach" -CELERY_DEFAULTS=${CELERY_DEFAULTS:-"/etc/default/celeryd"} +CELERY_DEFAULTS=${CELERY_DEFAULTS:-"/etc/default/${SCRIPT_NAME}"} -test -f "$CELERY_DEFAULTS" && . "$CELERY_DEFAULTS" +# Make sure executable configuration script is owned by root +_config_sanity() { + local path="$1" + local owner=$(ls -ld "$path" | awk '{print $3}') + local iwgrp=$(ls -ld "$path" | cut -b 6) + local iwoth=$(ls -ld "$path" | cut -b 9) + + if [ "$(id -u $owner)" != "0" ]; then + echo "Error: Config script '$path' must be owned by root!" + echo + echo "Resolution:" + echo "Review the file carefully and make sure it has not been " + echo "modified with mailicious intent. When sure the " + echo "script is safe to execute with superuser privileges " + echo "you can change ownership of the script:" + echo " $ sudo chown root '$path'" + exit 1 + fi + + if [ "$iwoth" != "-" ]; then # S_IWOTH + echo "Error: Config script '$path' cannot be writable by others!" + echo + echo "Resolution:" + echo "Review the file carefully and make sure it has not been " + echo "modified with malicious intent. When sure the " + echo "script is safe to execute with superuser privileges " + echo "you can change the scripts permissions:" + echo " $ sudo chmod 640 '$path'" + exit 1 + fi + if [ "$iwgrp" != "-" ]; then # S_IWGRP + echo "Error: Config script '$path' cannot be writable by group!" + echo + echo "Resolution:" + echo "Review the file carefully and make sure it has not been " + echo "modified with malicious intent. When sure the " + echo "script is safe to execute with superuser privileges " + echo "you can change the scripts permissions:" + echo " $ sudo chmod 640 '$path'" + exit 1 + fi +} + +if [ -f "$CELERY_DEFAULTS" ]; then + _config_sanity "$CELERY_DEFAULTS" + echo "Using config script: $CELERY_DEFAULTS" + . "$CELERY_DEFAULTS" +fi + +# Sets --app argument for CELERY_BIN +CELERY_APP_ARG="" +if [ ! -z "$CELERY_APP" ]; then + CELERY_APP_ARG="--app=$CELERY_APP" +fi + +CELERYD_USER=${CELERYD_USER:-$DEFAULT_USER} # Set CELERY_CREATE_DIRS to always create log/pid dirs. CELERY_CREATE_DIRS=${CELERY_CREATE_DIRS:-0} @@ -45,8 +126,8 @@ if [ -z "$CELERYD_LOG_FILE" ]; then fi CELERYD_LOG_LEVEL=${CELERYD_LOG_LEVEL:-${CELERYD_LOGLEVEL:-$DEFAULT_LOG_LEVEL}} -CELERYD_MULTI=${CELERYD_MULTI:-"celeryd-multi"} -CELERYD=${CELERYD:-$DEFAULT_CELERYD} +CELERY_BIN=${CELERY_BIN:-"celery"} +CELERYD_MULTI=${CELERYD_MULTI:-"$CELERY_BIN multi"} CELERYD_NODES=${CELERYD_NODES:-$DEFAULT_NODES} export CELERY_LOADER @@ -59,13 +140,6 @@ CELERYD_LOG_DIR=`dirname $CELERYD_LOG_FILE` CELERYD_PID_DIR=`dirname $CELERYD_PID_FILE` # Extra start-stop-daemon options, like user/group. -if [ -n "$CELERYD_USER" ]; then - DAEMON_OPTS="$DAEMON_OPTS --uid=$CELERYD_USER" -fi -if [ -n "$CELERYD_GROUP" ]; then - DAEMON_OPTS="$DAEMON_OPTS --gid=$CELERYD_GROUP" -fi - if [ -n "$CELERYD_CHDIR" ]; then DAEMON_OPTS="$DAEMON_OPTS --workdir=$CELERYD_CHDIR" fi @@ -125,58 +199,119 @@ create_paths() { export PATH="${PATH:+$PATH:}/usr/sbin:/sbin" -_get_pid_files() { - [ ! -d "$CELERYD_PID_DIR" ] && return - echo `ls -1 "$CELERYD_PID_DIR"/*.pid 2> /dev/null` +_get_pids() { + found_pids=0 + my_exitcode=0 + + for pid_file in "$CELERYD_PID_DIR"/*.pid; do + local pid=`cat "$pid_file"` + local cleaned_pid=`echo "$pid" | sed -e 's/[^0-9]//g'` + if [ -z "$pid" ] || [ "$cleaned_pid" != "$pid" ]; then + echo "bad pid file ($pid_file)" + one_failed=true + my_exitcode=1 + else + found_pids=1 + echo "$pid" + fi + + if [ $found_pids -eq 0 ]; then + echo "${SCRIPT_NAME}: All nodes down" + exit $my_exitcode + fi + done } -stop_workers () { - $CELERYD_MULTI stopwait $CELERYD_NODES --pidfile="$CELERYD_PID_FILE" - sleep $SLEEP_SECONDS + +_chuid () { + su "$CELERYD_USER" -c "$CELERYD_MULTI $*" } start_workers () { - $CELERYD_MULTI start $CELERYD_NODES $DAEMON_OPTS \ - --pidfile="$CELERYD_PID_FILE" \ - --logfile="$CELERYD_LOG_FILE" \ - --loglevel="$CELERYD_LOG_LEVEL" \ - --cmd="$CELERYD" \ - $CELERYD_OPTS - sleep $SLEEP_SECONDS + if [ ! -z "$CELERYD_ULIMIT" ]; then + ulimit $CELERYD_ULIMIT + fi + _chuid $* start $CELERYD_NODES $DAEMON_OPTS \ + --pidfile="$CELERYD_PID_FILE" \ + --logfile="$CELERYD_LOG_FILE" \ + --loglevel="$CELERYD_LOG_LEVEL" \ + $CELERY_APP_ARG \ + $CELERYD_OPTS +} + + +dryrun () { + (C_FAKEFORK=1 start_workers --verbose) +} + + +stop_workers () { + _chuid stopwait $CELERYD_NODES --pidfile="$CELERYD_PID_FILE" } restart_workers () { - $CELERYD_MULTI restart $CELERYD_NODES $DAEMON_OPTS \ - --pidfile="$CELERYD_PID_FILE" \ - --logfile="$CELERYD_LOG_FILE" \ - --loglevel="$CELERYD_LOG_LEVEL" \ - --cmd="$CELERYD" \ - $CELERYD_OPTS - sleep $SLEEP_SECONDS + _chuid restart $CELERYD_NODES $DAEMON_OPTS \ + --pidfile="$CELERYD_PID_FILE" \ + --logfile="$CELERYD_LOG_FILE" \ + --loglevel="$CELERYD_LOG_LEVEL" \ + $CELERY_APP_ARG \ + $CELERYD_OPTS } + +kill_workers() { + _chuid kill $CELERYD_NODES --pidfile="$CELERYD_PID_FILE" +} + + +restart_workers_graceful () { + local worker_pids= + worker_pids=`_get_pids` + [ "$one_failed" ] && exit 1 + + for worker_pid in $worker_pids; do + local failed= + kill -HUP $worker_pid 2> /dev/null || failed=true + if [ "$failed" ]; then + echo "${SCRIPT_NAME} worker (pid $worker_pid) could not be restarted" + one_failed=true + else + echo "${SCRIPT_NAME} worker (pid $worker_pid) received SIGHUP" + fi + done + + [ "$one_failed" ] && exit 1 || exit 0 +} + + check_status () { - local pid_files= - pid_files=`_get_pid_files` - [ -z "$pid_files" ] && echo "celeryd not running (no pidfile)" && exit 1 + my_exitcode=0 + found_pids=0 local one_failed= - for pid_file in $pid_files; do + for pid_file in "$CELERYD_PID_DIR"/*.pid; do + if [ ! -r $pid_file ]; then + echo "${SCRIPT_NAME} is stopped: no pids were found" + one_failed=true + break + fi + local node=`basename "$pid_file" .pid` local pid=`cat "$pid_file"` local cleaned_pid=`echo "$pid" | sed -e 's/[^0-9]//g'` if [ -z "$pid" ] || [ "$cleaned_pid" != "$pid" ]; then echo "bad pid file ($pid_file)" + one_failed=true else local failed= kill -0 $pid 2> /dev/null || failed=true if [ "$failed" ]; then - echo "celeryd (node $node) (pid $pid) is stopped, but pid file exists!" + echo "${SCRIPT_NAME} (node $node) (pid $pid) is stopped, but pid file exists!" one_failed=true else - echo "celeryd (node $node) (pid $pid) is running..." + echo "${SCRIPT_NAME} (node $node) (pid $pid) is running..." fi fi done @@ -211,24 +346,42 @@ case "$1" in check_paths restart_workers ;; + + graceful) + check_dev_null + restart_workers_graceful + ;; + + kill) + check_dev_null + kill_workers + ;; + + dryrun) + check_dev_null + dryrun + ;; + try-restart) check_dev_null check_paths restart_workers ;; + create-paths) check_dev_null create_paths ;; + check-paths) check_dev_null check_paths ;; + *) - echo "Usage: /etc/init.d/celeryd {start|stop|restart|kill|create-paths}" + echo "Usage: /etc/init.d/${SCRIPT_NAME} {start|stop|restart|graceful|kill|dryrun|create-paths}" exit 64 # EX_USAGE ;; esac exit 0 - diff --git a/orchestra/models/utils.py b/orchestra/models/utils.py index 9b3710ab..445b9619 100644 --- a/orchestra/models/utils.py +++ b/orchestra/models/utils.py @@ -47,3 +47,25 @@ def get_field_value(obj, field_name): rel = getattr(rel.get(), name) return rel + +def get_model_field_path(origin, target): + """ BFS search on model relaion fields """ + mqueue = [] + mqueue.append([origin]) + pqueue = [[]] + while mqueue: + model = mqueue.pop(0) + path = pqueue.pop(0) + if len(model) > 4: + raise RuntimeError('maximum recursion depth exceeded while looking for %s" % target') + node = model[-1] + if node == target: + return path + for field in node._meta.fields: + if field.rel: + new_model = list(model) + new_model.append(field.rel.to) + mqueue.append(new_model) + new_path = list(path) + new_path.append(field.name) + pqueue.append(new_path)