django-orchestra-test/orchestra/contrib/systemusers/tests/functional_tests/tests.py

388 lines
15 KiB
Python

import ftplib
import os
import re
import time
from functools import partial
from unittest import skip
import paramiko
from django.conf import settings as djsettings
from django.core.management.base import CommandError
from django.core.urlresolvers import reverse
from selenium.webdriver.support.select import Select
from orchestra.admin.utils import change_url
from orchestra.contrib.accounts.models import Account
from orchestra.contrib.orchestration.models import Server, Route
from orchestra.utils.sys import run, sshrun
from orchestra.utils.tests import (BaseLiveServerTestCase, random_ascii, snapshot_on_error,
save_response_on_error)
from ... import backends
from ...models import SystemUser
r = partial(run, silent=True, display=False)
sshr = partial(sshrun, silent=True, display=False)
class SystemUserMixin(object):
MASTER_SERVER = os.environ.get('ORCHESTRA_MASTER_SERVER', 'localhost')
DEPENDENCIES = (
'orchestra.contrib.orchestration',
'orcgestra.apps.systemusers',
)
def setUp(self):
super(SystemUserMixin, self).setUp()
self.add_route()
djsettings.DEBUG = True
def add_route(self):
master = Server.objects.create(name=self.MASTER_SERVER)
backend = backends.UNIXUserController.get_name()
Route.objects.create(backend=backend, match=True, host=master)
def save(self):
raise NotImplementedError
def add(self):
raise NotImplementedError
def delete(self):
raise NotImplementedError
def update(self):
raise NotImplementedError
def disable(self):
raise NotImplementedError
def add_group(self, username, groupname):
raise NotImplementedError
def validate_user(self, username):
idcmd = sshr(self.MASTER_SERVER, "id %s" % username)
self.assertEqual(0, idcmd.exit_code)
user = SystemUser.objects.get(username=username)
groups = list(user.groups.values_list('username', flat=True))
groups.append(user.username)
idgroups = idcmd.stdout.strip().split(' ')[2]
idgroups = re.findall(r'\d+\((\w+)\)', idgroups)
self.assertEqual(set(groups), set(idgroups))
def validate_delete(self, username):
self.assertRaises(SystemUser.DoesNotExist, SystemUser.objects.get, username=username)
self.assertRaises(CommandError,
sshrun, self.MASTER_SERVER, 'id %s' % username, display=False)
self.assertRaises(CommandError,
sshrun, self.MASTER_SERVER, 'grep "^%s:" /etc/groups' % username, display=False)
self.assertRaises(CommandError,
sshrun, self.MASTER_SERVER, 'grep "^%s:" /etc/passwd' % username, display=False)
self.assertRaises(CommandError,
sshrun, self.MASTER_SERVER, 'grep "^%s:" /etc/shadow' % username, display=False)
# Home will be deleted on account delete, see test_delete_account
def validate_ftp(self, username, password):
ftp = ftplib.FTP(self.MASTER_SERVER)
ftp.login(user=username, passwd=password)
ftp.close()
def validate_sftp(self, username, password):
transport = paramiko.Transport((self.MASTER_SERVER, 22))
transport.connect(username=username, password=password)
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.listdir()
sftp.close()
def validate_ssh(self, username, password):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self.MASTER_SERVER, username=username, password=password)
transport = ssh.get_transport()
channel = transport.open_session()
channel.exec_command('ls')
self.assertEqual(0, channel.recv_exit_status())
channel.close()
@skip("Skip because not exists get_auth_token in orm.api.Api")
def test_add(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password)
self.addCleanup(self.delete, username)
self.validate_user(username)
@skip("Skip because not exists get_auth_token in orm.api.Api")
def test_ftp(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password, shell='/dev/null')
self.addCleanup(self.delete, username)
self.assertRaises(paramiko.AuthenticationException,
self.validate_sftp, username, password)
self.assertRaises(paramiko.AuthenticationException,
self.validate_ssh, username, password)
@skip("Skip because not exists get_auth_token in orm.api.Api")
def test_sftp(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password, shell='/bin/rssh')
self.addCleanup(self.delete, username)
self.validate_sftp(username, password)
self.assertRaises(AssertionError, self.validate_ssh, username, password)
@skip("Skip because not exists get_auth_token in orm.api.Api")
def test_ssh(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password, shell='/bin/bash')
self.addCleanup(self.delete, username)
self.validate_ssh(username, password)
@skip("Skip because not exists get_auth_token in orm.api.Api")
def test_delete(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%sppppP001' % random_ascii(5)
self.add(username, password)
self.validate_user(username)
self.delete(username)
self.validate_delete(username)
self.assertRaises(Exception, self.delete, self.account.username)
@skip("Skip because not exists get_auth_token in orm.api.Api")
def test_add_group(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password)
self.addCleanup(self.delete, username)
self.validate_user(username)
username2 = '%s_systemuser' % random_ascii(10)
password2 = '@!?%spppP001' % random_ascii(5)
self.add(username2, password2)
self.addCleanup(self.delete, username2)
self.validate_user(username2)
self.add_group(username, username2)
user = SystemUser.objects.get(username=username)
groups = list(user.groups.values_list('username', flat=True))
self.assertIn(username2, groups)
self.validate_user(username)
@skip("Skip because not exists get_auth_token in orm.api.Api")
def test_disable(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password, shell='/dev/null')
self.addCleanup(self.delete, username)
self.validate_ftp(username, password)
self.disable(username)
self.validate_user(username)
self.assertRaises(ftplib.error_perm, self.validate_ftp, username, password)
@skip("Skip because not exists get_auth_token in orm.api.Api")
def test_change_password(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password)
self.addCleanup(self.delete, username)
self.validate_ftp(username, password)
new_password = '@!?%spppP001' % random_ascii(5)
self.change_password(username, new_password)
self.validate_ftp(username, new_password)
# TODO test resources
class RESTSystemUserMixin(SystemUserMixin):
def setUp(self):
super(RESTSystemUserMixin, self).setUp()
self.rest_login()
# create main user
self.save(self.account.username)
self.addCleanup(self.delete_account, self.account.username)
@save_response_on_error
def add(self, username, password, shell='/dev/null'):
self.rest.systemusers.create(username=username, password=password, shell=shell)
@save_response_on_error
def delete(self, username):
user = self.rest.systemusers.retrieve(username=username).get()
user.delete()
@save_response_on_error
def add_group(self, username, groupname):
user = self.rest.systemusers.retrieve(username=username).get()
user.groups.append({'username': groupname})
user.save()
@save_response_on_error
def disable(self, username):
user = self.rest.systemusers.retrieve(username=username).get()
user.is_active = False
user.save()
@save_response_on_error
def save(self, username):
user = self.rest.systemusers.retrieve(username=username).get()
user.save()
@save_response_on_error
def change_password(self, username, password):
user = self.rest.systemusers.retrieve(username=username).get()
user.set_password(password)
def delete_account(self, username):
self.rest.account.delete()
class AdminSystemUserMixin(SystemUserMixin):
def setUp(self):
super(AdminSystemUserMixin, self).setUp()
self.admin_login()
# create main user
self.save(self.account.username)
self.addCleanup(self.delete_account, self.account.username)
@snapshot_on_error
def add(self, username, password, shell='/dev/null'):
url = self.live_server_url + reverse('admin:systemusers_systemuser_add')
self.selenium.get(url)
username_field = self.selenium.find_element_by_id('id_username')
username_field.send_keys(username)
password_field = self.selenium.find_element_by_id('id_password1')
password_field.send_keys(password)
password_field = self.selenium.find_element_by_id('id_password2')
password_field.send_keys(password)
shell_input = self.selenium.find_element_by_id('id_shell')
shell_select = Select(shell_input)
shell_select.select_by_value(shell)
username_field.submit()
self.assertNotEqual(url, self.selenium.current_url)
@snapshot_on_error
def delete(self, username):
user = SystemUser.objects.get(username=username)
self.admin_delete(user)
@snapshot_on_error
def delete_account(self, username):
account = Account.objects.get(username=username)
self.admin_delete(account)
@snapshot_on_error
def disable(self, username):
user = SystemUser.objects.get(username=username)
self.admin_disable(user)
@snapshot_on_error
def add_group(self, username, groupname):
user = SystemUser.objects.get(username=username)
url = self.live_server_url + change_url(user)
self.selenium.get(url)
groups = self.selenium.find_element_by_id('id_groups_add_all_link')
groups.click()
time.sleep(0.5)
save = self.selenium.find_element_by_name('_save')
save.submit()
self.assertNotEqual(url, self.selenium.current_url)
@snapshot_on_error
def save(self, username):
user = SystemUser.objects.get(username=username)
url = self.live_server_url + change_url(user)
self.selenium.get(url)
save = self.selenium.find_element_by_name('_save')
save.submit()
self.assertNotEqual(url, self.selenium.current_url)
@snapshot_on_error
def change_password(self, username, password):
user = SystemUser.objects.get(username=username)
self.admin_change_password(user, password)
class RESTSystemUserTest(RESTSystemUserMixin, BaseLiveServerTestCase):
pass
class AdminSystemUserTest(AdminSystemUserMixin, BaseLiveServerTestCase):
@skip("Skip because not exists get_auth_token in orm.api.Api")
@snapshot_on_error
def test_create_account(self):
url = self.live_server_url + reverse('admin:accounts_account_add')
self.selenium.get(url)
account_username = '%s_account' % random_ascii(10)
username = self.selenium.find_element_by_id('id_username')
username.send_keys(account_username)
account_password = '@!?%spppP001' % random_ascii(5)
password = self.selenium.find_element_by_id('id_password1')
password.send_keys(account_password)
password = self.selenium.find_element_by_id('id_password2')
password.send_keys(account_password)
full_name = random_ascii(10)
full_name_field = self.selenium.find_element_by_id('id_full_name')
full_name_field.send_keys(full_name)
account_email = 'orchestra@orchestra.lan'
email = self.selenium.find_element_by_id('id_email')
email.send_keys(account_email)
contact_short_name = random_ascii(10)
short_name = self.selenium.find_element_by_id('id_contacts-0-short_name')
short_name.send_keys(contact_short_name)
email = self.selenium.find_element_by_id('id_contacts-0-email')
email.send_keys(account_email)
email.submit()
self.assertNotEqual(url, self.selenium.current_url)
self.addCleanup(self.delete_account, account_username)
self.assertEqual(0, sshr(self.MASTER_SERVER, "id %s" % account_username).exit_code)
@skip("Skip because not exists get_auth_token in orm.api.Api")
@snapshot_on_error
def test_delete_account(self):
home = self.account.main_systemuser.get_home()
self.admin_delete(self.account)
self.assertRaises(CommandError, run, 'ls %s' % home, display=False)
# Recreate a fucking fake account for test cleanup
self.account = self.create_account(username=self.account.username, superuser=True)
self.selenium.delete_all_cookies()
self.admin_login()
@skip("Skip because not exists get_auth_token in orm.api.Api")
@snapshot_on_error
def test_disable_account(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password)
self.addCleanup(self.delete, username)
self.validate_ftp(username, password)
self.disable(username)
self.validate_user(username)
disable = reverse('admin:accounts_account_disable', args=(self.account.pk,))
url = self.live_server_url + disable
self.selenium.get(url)
confirmation = self.selenium.find_element_by_name('post')
confirmation.submit()
self.assertNotEqual(url, self.selenium.current_url)
self.assertRaises(ftplib.error_perm, self.validate_ftp, username, password)
self.selenium.get(url)
self.assertNotEqual(url, self.selenium.current_url)
# Reenable for test cleanup
self.account.is_active = True
self.account.save()
self.admin_login()