diff --git a/TODO.md b/TODO.md
index 78d2cfdd..59614868 100644
--- a/TODO.md
+++ b/TODO.md
@@ -343,7 +343,6 @@ TODO mount the filesystem with "nosuid" option
# virtdomains file is not ideal, prevent fake/error on domains there! and make sure this file is required!
# Deprecate restart/start/stop services (do touch wsgi.py and fuck celery)
-# orchestrate async stdout stderr (inspired on pangea managemengt commands)
orchestra-beat support for uwsgi cron
make django admin taskstate uncollapse fucking traceback, ( if exists ?)
@@ -356,5 +355,7 @@ resorce monitoring more efficient, less mem an better queries for calc current d
# best_price rating method
+# paramiko arcfour cypher
-# error reporting on periodic tasks
+ciphers=['arcfour128', 'aes256']
+http://paramiko-docs.readthedocs.org/en/latest/api/transport.html
diff --git a/orchestra/contrib/orchestration/management/commands/orchestrate.py b/orchestra/contrib/orchestration/management/commands/orchestrate.py
index 3eb204d6..33cf0d0b 100644
--- a/orchestra/contrib/orchestration/management/commands/orchestrate.py
+++ b/orchestra/contrib/orchestration/management/commands/orchestrate.py
@@ -113,6 +113,6 @@ class Command(BaseCommand):
stderr = cstderr
if log.has_finished:
running.remove(log)
- time.sleep(0.1)
+ time.sleep(0.05)
for log in logs:
self.stdout.write(' '.join((log.backend, log.state)))
diff --git a/orchestra/contrib/orchestration/manager.py b/orchestra/contrib/orchestration/manager.py
index fc980c16..3136353e 100644
--- a/orchestra/contrib/orchestration/manager.py
+++ b/orchestra/contrib/orchestration/manager.py
@@ -25,6 +25,7 @@ def keep_log(execute, log, operations):
""" send report """
# Remember that threads have their oun connection poll
# No need to EVER temper with the transaction here
+ log = kwargs['log']
try:
log = execute(*args, **kwargs)
if log.state != log.SUCCESS:
@@ -116,11 +117,11 @@ def execute(scripts, serialize=False, async=None):
backend, operations = value
args = (route.host,)
if async is None:
- async = not serialize and route.async
+ is_async = not serialize and route.async
else:
- async = not serialize and async
+ is_async = not serialize and async
kwargs = {
- 'async': async,
+ 'async': is_async,
}
# we clone the connection just in case we are isolated inside a transaction
with db.clone(model=BackendLog) as handle:
@@ -136,7 +137,7 @@ def execute(scripts, serialize=False, async=None):
task = db.close_connection(task)
thread = threading.Thread(target=task, args=args, kwargs=kwargs)
thread.start()
- if not async:
+ if not is_async:
threads_to_join.append(thread)
logs.append(log)
[ thread.join() for thread in threads_to_join ]
diff --git a/orchestra/contrib/orchestration/methods.py b/orchestra/contrib/orchestration/methods.py
index 6141dcb5..938db0e5 100644
--- a/orchestra/contrib/orchestration/methods.py
+++ b/orchestra/contrib/orchestration/methods.py
@@ -10,82 +10,52 @@ import paramiko
from celery.datastructures import ExceptionInfo
from django.conf import settings as djsettings
-from orchestra.utils.python import CaptureStdout
+from orchestra.utils.sys import sshrun
+from orchestra.utils.python import CaptureStdout, import_class
from . import settings
logger = logging.getLogger(__name__)
-transports = {}
+paramiko_connections = {}
-def SSH(backend, log, server, cmds, async=False):
+def Paramiko(backend, log, server, cmds, async=False):
"""
- Executes cmds to remote server using SSH
-
- The script is first copied using SCP in order to overflood the channel with large scripts
- Then the script is executed using the defined backend.script_executable
+ Executes cmds to remote server using Pramaiko
"""
script = '\n'.join(cmds)
script = script.replace('\r', '')
- bscript = script.encode('utf-8')
- digest = hashlib.md5(bscript).hexdigest()
- path = os.path.join(settings.ORCHESTRATION_TEMP_SCRIPT_DIR, digest)
- remote_path = "%s.remote" % path
- # Ensure unique local paths for each file because of problems when os.remove(path)
- path += '@%s' % str(server)
log.state = log.STARTED
- log.script = '# %s\n%s' % (remote_path, script)
+ log.script = script
log.save(update_fields=('script', 'state'))
if not cmds:
return
channel = None
ssh = None
try:
- # Avoid "Argument list too long" on large scripts by genereting a file
- # and scping it to the remote server
- with os.fdopen(os.open(path, os.O_WRONLY | os.O_CREAT, 0o600), 'wb') as handle:
- handle.write(bscript)
-
- # ssh connection
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
addr = server.get_address()
- key = settings.ORCHESTRATION_SSH_KEY_PATH
- try:
- ssh.connect(addr, username='root', key_filename=key, timeout=10)
- except socket.error as e:
- logger.error('%s timed out on %s' % (backend, addr))
- log.state = log.TIMEOUT
- log.stderr = str(e)
- log.save(update_fields=['state', 'stderr'])
- return
+ # ssh connection
+ ssh = paramiko_connections.get(addr)
+ if not ssh:
+ ssh = paramiko.SSHClient()
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ key = settings.ORCHESTRATION_SSH_KEY_PATH
+ try:
+ ssh.connect(addr, username='root', key_filename=key)
+ except socket.error as e:
+ logger.error('%s timed out on %s' % (backend, addr))
+ log.state = log.TIMEOUT
+ log.stderr = str(e)
+ log.save(update_fields=['state', 'stderr'])
+ return
+ paramiko_connections[addr] = ssh
transport = ssh.get_transport()
-
- # Copy script to remote server
- sftp = paramiko.SFTPClient.from_transport(transport)
- sftp.put(path, remote_path)
- sftp.chmod(remote_path, 0o600)
- sftp.close()
- os.remove(path)
-
- # Execute it
- context = {
- 'executable': backend.script_executable,
- 'remote_path': remote_path,
- 'digest': digest,
- 'remove': '' if djsettings.DEBUG else "rm -fr %(remote_path)s\n",
- }
- cmd = (
- "[[ $(md5sum %(remote_path)s|awk {'print $1'}) == %(digest)s ]] && %(executable)s %(remote_path)s\n"
- "RETURN_CODE=$?\n"
- "%(remove)s"
- "exit $RETURN_CODE" % context
- )
channel = transport.open_session()
- channel.exec_command(cmd)
-
+ channel.exec_command(backend.script_executable)
+ channel.sendall(script)
+ channel.shutdown_write()
# Log results
logger.debug('%s running on %s' % (backend, server))
if async:
@@ -112,8 +82,8 @@ def SSH(backend, log, server, cmds, async=False):
log.stdout += channel.makefile('rb', -1).read().decode('utf-8')
log.stderr += channel.makefile_stderr('rb', -1).read().decode('utf-8')
- log.exit_code = exit_code = channel.recv_exit_status()
- log.state = log.SUCCESS if exit_code == 0 else log.FAILURE
+ log.exit_code = channel.recv_exit_status()
+ log.state = log.SUCCESS if log.exit_code == 0 else log.FAILURE
logger.debug('%s execution state on %s is %s' % (backend, server, log.state))
log.save()
except:
@@ -128,8 +98,55 @@ def SSH(backend, log, server, cmds, async=False):
log.save(update_fields=['state'])
if channel is not None:
channel.close()
- if ssh is not None:
- ssh.close()
+
+
+def OpenSSH(backend, log, server, cmds, async=False):
+ """
+ Executes cmds to remote server using SSH with connection resuse for maximum performance
+ """
+ script = '\n'.join(cmds)
+ script = script.replace('\r', '')
+ log.state = log.STARTED
+ log.script = script
+ log.save(update_fields=('script', 'state'))
+ if not cmds:
+ return
+ channel = None
+ ssh = None
+ try:
+ ssh = sshrun(server.get_address(), script, executable=backend.script_executable,
+ persist=True, async=async)
+ logger.debug('%s running on %s' % (backend, server))
+ if async:
+ second = False
+ for state in ssh:
+ log.stdout += state.stdout.decode('utf8')
+ log.stderr += state.stderr.decode('utf8')
+ log.save()
+ log.exit_code = state.exit_code
+ else:
+ log.stdout = ssh.stdout
+ log.stderr = ssh.stderr
+ log.exit_code = ssh.exit_code
+ log.state = log.SUCCESS if log.exit_code == 0 else log.FAILURE
+ logger.debug('%s execution state on %s is %s' % (backend, server, log.state))
+ log.save()
+ except:
+ log.state = log.ERROR
+ log.traceback = ExceptionInfo(sys.exc_info()).traceback
+ logger.error('Exception while executing %s on %s' % (backend, server))
+ logger.debug(log.traceback)
+ log.save()
+ finally:
+ if log.state == log.STARTED:
+ log.state = log.ABORTED
+ log.save(update_fields=['state'])
+
+
+def SSH(*args, **kwargs):
+ """ facade function enabling to chose between multiple SSH backends"""
+ method = import_class(settings.ORCHESTRATION_SSH_METHOD_BACKEND)
+ return method(*args, **kwargs)
def Python(backend, log, server, cmds, async=False):
diff --git a/orchestra/contrib/orchestration/models.py b/orchestra/contrib/orchestration/models.py
index b8281b78..a5c735d9 100644
--- a/orchestra/contrib/orchestration/models.py
+++ b/orchestra/contrib/orchestration/models.py
@@ -98,7 +98,6 @@ class BackendLog(models.Model):
def backend_class(self):
return ServiceBackend.get_backend(self.backend)
-
class BackendOperation(models.Model):
diff --git a/orchestra/contrib/orchestration/settings.py b/orchestra/contrib/orchestration/settings.py
index cea71474..7539dc8b 100644
--- a/orchestra/contrib/orchestration/settings.py
+++ b/orchestra/contrib/orchestration/settings.py
@@ -1,5 +1,7 @@
from os import path
+from django.utils.translation import ugettext_lazy as _
+
from orchestra.contrib.settings import Setting
@@ -28,10 +30,6 @@ ORCHESTRATION_ROUTER = Setting('ORCHESTRATION_ROUTER',
)
-ORCHESTRATION_TEMP_SCRIPT_DIR = Setting('ORCHESTRATION_TEMP_SCRIPT_DIR',
- '/dev/shm'
-)
-
ORCHESTRATION_DISABLE_EXECUTION = Setting('ORCHESTRATION_DISABLE_EXECUTION',
False
@@ -41,3 +39,13 @@ ORCHESTRATION_DISABLE_EXECUTION = Setting('ORCHESTRATION_DISABLE_EXECUTION',
ORCHESTRATION_BACKEND_CLEANUP_DAYS = Setting('ORCHESTRATION_BACKEND_CLEANUP_DAYS',
7
)
+
+
+ORCHESTRATION_SSH_METHOD_BACKEND = Setting('ORCHESTRATION_SSH_METHOD_BACKEND',
+ 'orchestra.contrib.orchestration.methods.OpenSSH',
+ help_text=_("Two methods provided:
"
+ "orchestra.contrib.orchestration.methods.OpenSSH with ControlPersist.
"
+ "orchestra.contrib.orchestration.methods.Paramiko with connection pool.
"
+ "Both perform similarly, but OpenSSH has the advantage that the connections are shared between workers,
"
+ "Paramiko, in contrast, has a per worker connection pool.")
+)
diff --git a/orchestra/contrib/resources/backends.py b/orchestra/contrib/resources/backends.py
index 41dd3f39..644e5c3e 100644
--- a/orchestra/contrib/resources/backends.py
+++ b/orchestra/contrib/resources/backends.py
@@ -69,6 +69,8 @@ class ServiceMonitor(ServiceBackend):
except ValueError:
cls_name = self.__class__.__name__
raise ValueError("%s expected ' ' got '%s'" % (cls_name, line))
+ if isinstance(value, bytes):
+ value = value.decode('ascii')
MonitorData.objects.create(monitor=name, object_id=object_id,
content_type=ct, value=value, created_at=self.current_date)
diff --git a/orchestra/utils/sys.py b/orchestra/utils/sys.py
index 1865f4e9..abaef876 100644
--- a/orchestra/utils/sys.py
+++ b/orchestra/utils/sys.py
@@ -102,6 +102,7 @@ def runiterator(command, display=False, stdin=b''):
p.stderr.close()
raise StopIteration
+
def join(iterator, display=False, silent=False, valid_codes=(0,)):
""" joins the iterator process """
stdout = b''
@@ -136,13 +137,20 @@ def run(command, display=False, valid_codes=(0,), silent=False, stdin=b'', async
next(iterator)
if async:
return iterator
- return join(iterator, display=display, silent=silent, valid_codes=valie_codes)
+ return join(iterator, display=display, silent=silent, valid_codes=valid_codes)
-def sshrun(addr, command, *args, **kwargs):
- command = command.replace("'", """'"'"'""")
- cmd = "ssh -o stricthostkeychecking=no -C root@%s '%s'" % (addr, command)
- return run(cmd, *args, **kwargs)
+def sshrun(addr, command, *args, executable='bash', persist=False, **kwargs):
+ options = ['stricthostkeychecking=no']
+ if persist:
+ options.extend((
+ 'ControlMaster=auto',
+ 'ControlPersist=yes',
+ 'ControlPath=~/.ssh/orchestra-%r-%h-%p',
+ ))
+ cmd = 'ssh -o {options} -C root@{addr} {executable}'.format(options=' -o '.join(options),
+ addr=addr, executable=executable)
+ return run(cmd, *args, stdin=command.encode('utf8'), **kwargs)
def get_default_celeryd_username():
@@ -202,6 +210,6 @@ class LockFile(object):
self.release()
-def touch_wsgi():
+def touch_wsgi(delay=5):
from . import paths
- run('{ sleep 2 && touch %s/wsgi.py; } &' % paths.get_project_dir(), async=True)
+ run('{ sleep %i && touch %s/wsgi.py; } &' % (delay, paths.get_project_dir()), async=True)