2019-10-07 14:33:48 +00:00
|
|
|
"""passbook reputation request signals"""
|
2019-03-03 19:26:25 +00:00
|
|
|
from django.contrib.auth.signals import user_logged_in, user_login_failed
|
|
|
|
from django.dispatch import receiver
|
|
|
|
from ipware import get_client_ip
|
2019-10-01 08:24:10 +00:00
|
|
|
from structlog import get_logger
|
2019-03-03 19:26:25 +00:00
|
|
|
|
|
|
|
from passbook.core.models import User
|
2019-10-07 14:33:48 +00:00
|
|
|
from passbook.policies.reputation.models import IPReputation, UserReputation
|
2019-03-03 19:26:25 +00:00
|
|
|
|
2019-10-04 08:08:53 +00:00
|
|
|
LOGGER = get_logger()
|
2019-03-03 19:26:25 +00:00
|
|
|
|
|
|
|
|
2019-07-22 13:46:41 +00:00
|
|
|
def get_remote_ip(request):
|
|
|
|
"""Small wrapper of get_client_ip to catch errors"""
|
|
|
|
try:
|
|
|
|
remote_ip, _ = get_client_ip(request)
|
|
|
|
if remote_ip:
|
|
|
|
return remote_ip
|
|
|
|
if 'ip' in request:
|
|
|
|
return request['ip']
|
|
|
|
except (AttributeError, ValueError):
|
|
|
|
pass
|
|
|
|
return '255.255.255.255'
|
|
|
|
|
2019-03-03 19:26:25 +00:00
|
|
|
def update_score(request, username, amount):
|
|
|
|
"""Update score for IP and User"""
|
2019-07-22 13:46:41 +00:00
|
|
|
remote_ip = get_remote_ip(request)
|
2019-10-07 14:33:48 +00:00
|
|
|
ip_score, _ = IPReputation.objects.update_or_create(ip=remote_ip)
|
2019-03-03 19:26:25 +00:00
|
|
|
ip_score.score += amount
|
|
|
|
ip_score.save()
|
2019-10-04 08:21:33 +00:00
|
|
|
LOGGER.debug("Updated score", amount=amount, for_ip=remote_ip)
|
2019-03-03 19:26:25 +00:00
|
|
|
user = User.objects.filter(username=username)
|
|
|
|
if not user.exists():
|
|
|
|
return
|
2019-10-07 14:33:48 +00:00
|
|
|
user_score, _ = UserReputation.objects.update_or_create(user=user.first())
|
2019-03-03 19:26:25 +00:00
|
|
|
user_score.score += amount
|
|
|
|
user_score.save()
|
2019-10-04 08:21:33 +00:00
|
|
|
LOGGER.debug("Updated score", amount=amount, for_user=username)
|
2019-03-03 19:26:25 +00:00
|
|
|
|
|
|
|
@receiver(user_login_failed)
|
|
|
|
def handle_failed_login(sender, request, credentials, **kwargs):
|
|
|
|
"""Lower Score for failed loging attempts"""
|
|
|
|
update_score(request, credentials.get('username'), -1)
|
|
|
|
|
|
|
|
@receiver(user_logged_in)
|
|
|
|
def handle_successful_login(sender, request, user, **kwargs):
|
|
|
|
"""Raise score for successful attempts"""
|
|
|
|
update_score(request, user.username, 1)
|