django-orchestra/orchestra/contrib/systemusers/tests/functional_tests/tests.py

379 lines
14 KiB
Python

import ftplib
import os
import re
import time
import unittest
from functools import partial
import paramiko
from django.conf import settings as djsettings
from django.core.management.base import CommandError
from django.urls import reverse
from selenium.webdriver.support.select import Select
from orchestra.admin.utils import change_url
from orchestra.contrib.accounts.models import Account
from orchestra.contrib.orchestration.models import Server, Route
from orchestra.utils.sys import run, sshrun
from orchestra.utils.tests import (BaseLiveServerTestCase, random_ascii, snapshot_on_error,
save_response_on_error)
from ... import backends
from ...models import SystemUser
TEST_REST_API = int(os.getenv('TEST_REST_API', '0'))
r = partial(run, silent=True, display=False)
sshr = partial(sshrun, silent=True, display=False)
class SystemUserMixin(object):
MASTER_SERVER = os.environ.get('ORCHESTRA_MASTER_SERVER', 'localhost')
DEPENDENCIES = (
'orchestra.contrib.orchestration',
'orcgestra.apps.systemusers',
)
def setUp(self):
super(SystemUserMixin, self).setUp()
self.add_route()
djsettings.DEBUG = True
def add_route(self):
master = Server.objects.create(name=self.MASTER_SERVER)
backend = backends.UNIXUserController.get_name()
Route.objects.create(backend=backend, match=True, host=master)
def save(self):
raise NotImplementedError
def add(self):
raise NotImplementedError
def delete(self):
raise NotImplementedError
def update(self):
raise NotImplementedError
def disable(self):
raise NotImplementedError
def add_group(self, username, groupname):
raise NotImplementedError
def validate_user(self, username):
idcmd = sshr(self.MASTER_SERVER, "id %s" % username)
self.assertEqual(0, idcmd.exit_code)
user = SystemUser.objects.get(username=username)
groups = list(user.groups.values_list('username', flat=True))
groups.append(user.username)
idgroups = idcmd.stdout.strip().split(' ')[2]
idgroups = re.findall(r'\d+\((\w+)\)', idgroups)
self.assertEqual(set(groups), set(idgroups))
def validate_delete(self, username):
self.assertRaises(SystemUser.DoesNotExist, SystemUser.objects.get, username=username)
self.assertRaises(CommandError,
sshrun, self.MASTER_SERVER, 'id %s' % username, display=False)
self.assertRaises(CommandError,
sshrun, self.MASTER_SERVER, 'grep "^%s:" /etc/groups' % username, display=False)
self.assertRaises(CommandError,
sshrun, self.MASTER_SERVER, 'grep "^%s:" /etc/passwd' % username, display=False)
self.assertRaises(CommandError,
sshrun, self.MASTER_SERVER, 'grep "^%s:" /etc/shadow' % username, display=False)
# Home will be deleted on account delete, see test_delete_account
def validate_ftp(self, username, password):
ftp = ftplib.FTP(self.MASTER_SERVER)
ftp.login(user=username, passwd=password)
ftp.close()
def validate_sftp(self, username, password):
transport = paramiko.Transport((self.MASTER_SERVER, 22))
transport.connect(username=username, password=password)
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.listdir()
sftp.close()
def validate_ssh(self, username, password):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self.MASTER_SERVER, username=username, password=password)
transport = ssh.get_transport()
channel = transport.open_session()
channel.exec_command('ls')
self.assertEqual(0, channel.recv_exit_status())
channel.close()
def test_add(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password)
self.addCleanup(self.delete, username)
self.validate_user(username)
def test_ftp(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password, shell='/dev/null')
self.addCleanup(self.delete, username)
self.assertRaises(paramiko.AuthenticationException,
self.validate_sftp, username, password)
self.assertRaises(paramiko.AuthenticationException,
self.validate_ssh, username, password)
def test_sftp(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password, shell='/bin/rssh')
self.addCleanup(self.delete, username)
self.validate_sftp(username, password)
self.assertRaises(AssertionError, self.validate_ssh, username, password)
def test_ssh(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password, shell='/bin/bash')
self.addCleanup(self.delete, username)
self.validate_ssh(username, password)
def test_delete(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%sppppP001' % random_ascii(5)
self.add(username, password)
self.validate_user(username)
self.delete(username)
self.validate_delete(username)
self.assertRaises(Exception, self.delete, self.account.username)
def test_add_group(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password)
self.addCleanup(self.delete, username)
self.validate_user(username)
username2 = '%s_systemuser' % random_ascii(10)
password2 = '@!?%spppP001' % random_ascii(5)
self.add(username2, password2)
self.addCleanup(self.delete, username2)
self.validate_user(username2)
self.add_group(username, username2)
user = SystemUser.objects.get(username=username)
groups = list(user.groups.values_list('username', flat=True))
self.assertIn(username2, groups)
self.validate_user(username)
def test_disable(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password, shell='/dev/null')
self.addCleanup(self.delete, username)
self.validate_ftp(username, password)
self.disable(username)
self.validate_user(username)
self.assertRaises(ftplib.error_perm, self.validate_ftp, username, password)
def test_change_password(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password)
self.addCleanup(self.delete, username)
self.validate_ftp(username, password)
new_password = '@!?%spppP001' % random_ascii(5)
self.change_password(username, new_password)
self.validate_ftp(username, new_password)
# TODO test resources
@unittest.skipUnless(TEST_REST_API, "REST API tests")
class RESTSystemUserMixin(SystemUserMixin):
def setUp(self):
super(RESTSystemUserMixin, self).setUp()
self.rest_login()
# create main user
self.save(self.account.username)
self.addCleanup(self.delete_account, self.account.username)
@save_response_on_error
def add(self, username, password, shell='/dev/null'):
self.rest.systemusers.create(username=username, password=password, shell=shell)
@save_response_on_error
def delete(self, username):
user = self.rest.systemusers.retrieve(username=username).get()
user.delete()
@save_response_on_error
def add_group(self, username, groupname):
user = self.rest.systemusers.retrieve(username=username).get()
user.groups.append({'username': groupname})
user.save()
@save_response_on_error
def disable(self, username):
user = self.rest.systemusers.retrieve(username=username).get()
user.is_active = False
user.save()
@save_response_on_error
def save(self, username):
user = self.rest.systemusers.retrieve(username=username).get()
user.save()
@save_response_on_error
def change_password(self, username, password):
user = self.rest.systemusers.retrieve(username=username).get()
user.set_password(password)
def delete_account(self, username):
self.rest.account.delete()
class AdminSystemUserMixin(SystemUserMixin):
def setUp(self):
super(AdminSystemUserMixin, self).setUp()
self.admin_login()
# create main user
self.save(self.account.username)
self.addCleanup(self.delete_account, self.account.username)
@snapshot_on_error
def add(self, username, password, shell='/dev/null'):
url = self.live_server_url + reverse('admin:systemusers_systemuser_add')
self.selenium.get(url)
username_field = self.selenium.find_element_by_id('id_username')
username_field.send_keys(username)
password_field = self.selenium.find_element_by_id('id_password1')
password_field.send_keys(password)
password_field = self.selenium.find_element_by_id('id_password2')
password_field.send_keys(password)
shell_input = self.selenium.find_element_by_id('id_shell')
shell_select = Select(shell_input)
shell_select.select_by_value(shell)
username_field.submit()
self.assertNotEqual(url, self.selenium.current_url)
@snapshot_on_error
def delete(self, username):
user = SystemUser.objects.get(username=username)
self.admin_delete(user)
@snapshot_on_error
def delete_account(self, username):
account = Account.objects.get(username=username)
self.admin_delete(account)
@snapshot_on_error
def disable(self, username):
user = SystemUser.objects.get(username=username)
self.admin_disable(user)
@snapshot_on_error
def add_group(self, username, groupname):
user = SystemUser.objects.get(username=username)
url = self.live_server_url + change_url(user)
self.selenium.get(url)
groups = self.selenium.find_element_by_id('id_groups_add_all_link')
groups.click()
time.sleep(0.5)
save = self.selenium.find_element_by_name('_save')
save.submit()
self.assertNotEqual(url, self.selenium.current_url)
@snapshot_on_error
def save(self, username):
user = SystemUser.objects.get(username=username)
url = self.live_server_url + change_url(user)
self.selenium.get(url)
save = self.selenium.find_element_by_name('_save')
save.submit()
self.assertNotEqual(url, self.selenium.current_url)
@snapshot_on_error
def change_password(self, username, password):
user = SystemUser.objects.get(username=username)
self.admin_change_password(user, password)
class RESTSystemUserTest(RESTSystemUserMixin, BaseLiveServerTestCase):
pass
class AdminSystemUserTest(AdminSystemUserMixin, BaseLiveServerTestCase):
@snapshot_on_error
def test_create_account(self):
url = self.live_server_url + reverse('admin:accounts_account_add')
self.selenium.get(url)
account_username = '%s_account' % random_ascii(10)
username = self.selenium.find_element_by_id('id_username')
username.send_keys(account_username)
account_password = '@!?%spppP001' % random_ascii(5)
password = self.selenium.find_element_by_id('id_password1')
password.send_keys(account_password)
password = self.selenium.find_element_by_id('id_password2')
password.send_keys(account_password)
full_name = random_ascii(10)
full_name_field = self.selenium.find_element_by_id('id_full_name')
full_name_field.send_keys(full_name)
account_email = 'orchestra@orchestra.lan'
email = self.selenium.find_element_by_id('id_email')
email.send_keys(account_email)
contact_short_name = random_ascii(10)
short_name = self.selenium.find_element_by_id('id_contacts-0-short_name')
short_name.send_keys(contact_short_name)
email = self.selenium.find_element_by_id('id_contacts-0-email')
email.send_keys(account_email)
email.submit()
self.assertNotEqual(url, self.selenium.current_url)
self.addCleanup(self.delete_account, account_username)
self.assertEqual(0, sshr(self.MASTER_SERVER, "id %s" % account_username).exit_code)
@snapshot_on_error
def test_delete_account(self):
home = self.account.main_systemuser.get_home()
self.admin_delete(self.account)
self.assertRaises(CommandError, run, 'ls %s' % home, display=False)
# Recreate a fucking fake account for test cleanup
self.account = self.create_account(username=self.account.username, superuser=True)
self.selenium.delete_all_cookies()
self.admin_login()
@snapshot_on_error
def test_disable_account(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password)
self.addCleanup(self.delete, username)
self.validate_ftp(username, password)
self.disable(username)
self.validate_user(username)
disable = reverse('admin:accounts_account_disable', args=(self.account.pk,))
url = self.live_server_url + disable
self.selenium.get(url)
confirmation = self.selenium.find_element_by_name('post')
confirmation.submit()
self.assertNotEqual(url, self.selenium.current_url)
self.assertRaises(ftplib.error_perm, self.validate_ftp, username, password)
self.selenium.get(url)
self.assertNotEqual(url, self.selenium.current_url)
# Reenable for test cleanup
self.account.is_active = True
self.account.save()
self.admin_login()