From b8819b7a4a7875d207e22345ce96d39585e12690 Mon Sep 17 00:00:00 2001 From: Cayo Puigdefabregas Date: Fri, 27 May 2022 16:48:40 +0200 Subject: [PATCH] change is_temporary, is_outgoing, is_incoming --- ereuse_devicehub/resources/lot/models.py | 254 +++++++++++++++-------- 1 file changed, 162 insertions(+), 92 deletions(-) diff --git a/ereuse_devicehub/resources/lot/models.py b/ereuse_devicehub/resources/lot/models.py index 88c68626..ef6577a8 100644 --- a/ereuse_devicehub/resources/lot/models.py +++ b/ereuse_devicehub/resources/lot/models.py @@ -5,12 +5,11 @@ from typing import Union from boltons import urlutils from citext import CIText from flask import g -from flask_login import current_user from sqlalchemy import TEXT from sqlalchemy.dialects.postgresql import UUID from sqlalchemy_utils import LtreeType from sqlalchemy_utils.types.ltree import LQUERY -from teal.db import CASCADE_OWN, UUIDLtree, check_range, IntEnum +from teal.db import CASCADE_OWN, IntEnum, UUIDLtree, check_range from teal.resource import url_for_resource from ereuse_devicehub.db import create_view, db, exp, f @@ -21,70 +20,88 @@ from ereuse_devicehub.resources.user.models import User class Lot(Thing): - id = db.Column(UUID(as_uuid=True), primary_key=True) # uuid is generated on init by default + id = db.Column( + UUID(as_uuid=True), primary_key=True + ) # uuid is generated on init by default name = db.Column(CIText(), nullable=False) description = db.Column(CIText()) description.comment = """A comment about the lot.""" closed = db.Column(db.Boolean, default=False, nullable=False) closed.comment = """A closed lot cannot be modified anymore.""" - devices = db.relationship(Device, - backref=db.backref('lots', lazy=True, collection_class=set), - secondary=lambda: LotDevice.__table__, - lazy=True, - collection_class=set) + devices = db.relationship( + Device, + backref=db.backref('lots', lazy=True, collection_class=set), + secondary=lambda: LotDevice.__table__, + lazy=True, + collection_class=set, + ) """The **children** devices that the lot has. Note that the lot can have more devices, if they are inside descendant lots. """ - parents = db.relationship(lambda: Lot, - viewonly=True, - lazy=True, - collection_class=set, - secondary=lambda: LotParent.__table__, - primaryjoin=lambda: Lot.id == LotParent.child_id, - secondaryjoin=lambda: LotParent.parent_id == Lot.id, - cascade='refresh-expire', # propagate changes outside ORM - backref=db.backref('children', - viewonly=True, - lazy=True, - cascade='refresh-expire', - collection_class=set) - ) + parents = db.relationship( + lambda: Lot, + viewonly=True, + lazy=True, + collection_class=set, + secondary=lambda: LotParent.__table__, + primaryjoin=lambda: Lot.id == LotParent.child_id, + secondaryjoin=lambda: LotParent.parent_id == Lot.id, + cascade='refresh-expire', # propagate changes outside ORM + backref=db.backref( + 'children', + viewonly=True, + lazy=True, + cascade='refresh-expire', + collection_class=set, + ), + ) """The parent lots.""" - all_devices = db.relationship(Device, - viewonly=True, - lazy=True, - collection_class=set, - secondary=lambda: LotDeviceDescendants.__table__, - primaryjoin=lambda: Lot.id == LotDeviceDescendants.ancestor_lot_id, - secondaryjoin=lambda: LotDeviceDescendants.device_id == Device.id) + all_devices = db.relationship( + Device, + viewonly=True, + lazy=True, + collection_class=set, + secondary=lambda: LotDeviceDescendants.__table__, + primaryjoin=lambda: Lot.id == LotDeviceDescendants.ancestor_lot_id, + secondaryjoin=lambda: LotDeviceDescendants.device_id == Device.id, + ) """All devices, including components, inside this lot and its descendants. """ amount = db.Column(db.Integer, check_range('amount', min=0, max=100), default=0) - owner_id = db.Column(UUID(as_uuid=True), - db.ForeignKey(User.id), - nullable=False, - default=lambda: g.user.id) + owner_id = db.Column( + UUID(as_uuid=True), + db.ForeignKey(User.id), + nullable=False, + default=lambda: g.user.id, + ) owner = db.relationship(User, primaryjoin=owner_id == User.id) - transfer_state = db.Column(IntEnum(TransferState), default=TransferState.Initial, nullable=False) + transfer_state = db.Column( + IntEnum(TransferState), default=TransferState.Initial, nullable=False + ) transfer_state.comment = TransferState.__doc__ - receiver_address = db.Column(CIText(), - db.ForeignKey(User.email), - nullable=False, - default=lambda: g.user.email) + receiver_address = db.Column( + CIText(), + db.ForeignKey(User.email), + nullable=False, + default=lambda: g.user.email, + ) receiver = db.relationship(User, primaryjoin=receiver_address == User.email) - def __init__(self, name: str, closed: bool = closed.default.arg, - description: str = None) -> None: + def __init__( + self, name: str, closed: bool = closed.default.arg, description: str = None + ) -> None: """Initializes a lot :param name: :param closed: """ - super().__init__(id=uuid.uuid4(), name=name, closed=closed, description=description) + super().__init__( + id=uuid.uuid4(), name=name, closed=closed, description=description + ) Path(self) # Lots have always one edge per default. @property @@ -102,20 +119,32 @@ class Lot(Thing): @property def is_temporary(self): - return not bool(self.trade) + return not bool(self.trade) and not bool(self.transfer) @property def is_incoming(self): - return bool(self.trade and self.trade.user_to == current_user) + if self.trade: + return self.trade.user_to == g.user + if self.transfer: + return self.transfer.user_to == g.user + + return False @property def is_outgoing(self): - return bool(self.trade and self.trade.user_from == current_user) + if self.trade: + return self.trade.user_from == g.user + if self.transfer: + return self.transfer.user_from == g.user + + return False @classmethod def descendantsq(cls, id): _id = UUIDLtree.convert(id) - return (cls.id == Path.lot_id) & Path.path.lquery(exp.cast('*.{}.*'.format(_id), LQUERY)) + return (cls.id == Path.lot_id) & Path.path.lquery( + exp.cast('*.{}.*'.format(_id), LQUERY) + ) @classmethod def roots(cls): @@ -176,13 +205,17 @@ class Lot(Thing): if isinstance(child, Lot): return Path.has_lot(self.id, child.id) elif isinstance(child, Device): - device = db.session.query(LotDeviceDescendants) \ - .filter(LotDeviceDescendants.device_id == child.id) \ - .filter(LotDeviceDescendants.ancestor_lot_id == self.id) \ + device = ( + db.session.query(LotDeviceDescendants) + .filter(LotDeviceDescendants.device_id == child.id) + .filter(LotDeviceDescendants.ancestor_lot_id == self.id) .one_or_none() + ) return device else: - raise TypeError('Lot only contains devices and lots, not {}'.format(child.__class__)) + raise TypeError( + 'Lot only contains devices and lots, not {}'.format(child.__class__) + ) def __repr__(self) -> str: return ''.format(self) @@ -192,35 +225,44 @@ class LotDevice(db.Model): device_id = db.Column(db.BigInteger, db.ForeignKey(Device.id), primary_key=True) lot_id = db.Column(UUID(as_uuid=True), db.ForeignKey(Lot.id), primary_key=True) created = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) - author_id = db.Column(UUID(as_uuid=True), - db.ForeignKey(User.id), - nullable=False, - default=lambda: g.user.id) + author_id = db.Column( + UUID(as_uuid=True), + db.ForeignKey(User.id), + nullable=False, + default=lambda: g.user.id, + ) author = db.relationship(User, primaryjoin=author_id == User.id) author_id.comment = """The user that put the device in the lot.""" class Path(db.Model): - id = db.Column(db.UUID(as_uuid=True), - primary_key=True, - server_default=db.text('gen_random_uuid()')) + id = db.Column( + db.UUID(as_uuid=True), + primary_key=True, + server_default=db.text('gen_random_uuid()'), + ) lot_id = db.Column(db.UUID(as_uuid=True), db.ForeignKey(Lot.id), nullable=False) - lot = db.relationship(Lot, - backref=db.backref('paths', - lazy=True, - collection_class=set, - cascade=CASCADE_OWN), - primaryjoin=Lot.id == lot_id) + lot = db.relationship( + Lot, + backref=db.backref( + 'paths', lazy=True, collection_class=set, cascade=CASCADE_OWN + ), + primaryjoin=Lot.id == lot_id, + ) path = db.Column(LtreeType, nullable=False) - created = db.Column(db.TIMESTAMP(timezone=True), server_default=db.text('CURRENT_TIMESTAMP')) + created = db.Column( + db.TIMESTAMP(timezone=True), server_default=db.text('CURRENT_TIMESTAMP') + ) created.comment = """When Devicehub created this.""" __table_args__ = ( # dag.delete_edge needs to disable internally/temporarily the unique constraint - db.UniqueConstraint(path, name='path_unique', deferrable=True, initially='immediate'), + db.UniqueConstraint( + path, name='path_unique', deferrable=True, initially='immediate' + ), db.Index('path_gist', path, postgresql_using='gist'), db.Index('path_btree', path, postgresql_using='btree'), - db.Index('lot_id_index', lot_id, postgresql_using='hash') + db.Index('lot_id_index', lot_id, postgresql_using='hash'), ) def __init__(self, lot: Lot) -> None: @@ -243,7 +285,9 @@ class Path(db.Model): child_id = UUIDLtree.convert(child_id) return bool( db.session.execute( - "SELECT 1 from path where path ~ '*.{}.*.{}.*'".format(parent_id, child_id) + "SELECT 1 from path where path ~ '*.{}.*.{}.*'".format( + parent_id, child_id + ) ).first() ) @@ -263,47 +307,73 @@ class LotDeviceDescendants(db.Model): """Ancestor lot table.""" _desc = Lot.__table__.alias() """Descendant lot table.""" - lot_device = _desc \ - .join(LotDevice, _desc.c.id == LotDevice.lot_id) \ - .join(Path, _desc.c.id == Path.lot_id) + lot_device = _desc.join(LotDevice, _desc.c.id == LotDevice.lot_id).join( + Path, _desc.c.id == Path.lot_id + ) """Join: Path -- Lot -- LotDevice""" - descendants = "path.path ~ (CAST('*.'|| replace(CAST({}.id as text), '-', '_') " \ - "|| '.*' AS LQUERY))".format(_ancestor.name) + descendants = ( + "path.path ~ (CAST('*.'|| replace(CAST({}.id as text), '-', '_') " + "|| '.*' AS LQUERY))".format(_ancestor.name) + ) """Query that gets the descendants of the ancestor lot.""" - devices = db.select([ - LotDevice.device_id, - _desc.c.id.label('parent_lot_id'), - _ancestor.c.id.label('ancestor_lot_id'), - None - ]).select_from(_ancestor).select_from(lot_device).where(db.text(descendants)) + devices = ( + db.select( + [ + LotDevice.device_id, + _desc.c.id.label('parent_lot_id'), + _ancestor.c.id.label('ancestor_lot_id'), + None, + ] + ) + .select_from(_ancestor) + .select_from(lot_device) + .where(db.text(descendants)) + ) # Components _parent_device = Device.__table__.alias(name='parent_device') """The device that has the access to the lot.""" - lot_device_component = lot_device \ - .join(_parent_device, _parent_device.c.id == LotDevice.device_id) \ - .join(Component, _parent_device.c.id == Component.parent_id) + lot_device_component = lot_device.join( + _parent_device, _parent_device.c.id == LotDevice.device_id + ).join(Component, _parent_device.c.id == Component.parent_id) """Join: Path -- Lot -- LotDevice -- ParentDevice (Device) -- Component""" - components = db.select([ - Component.id.label('device_id'), - _desc.c.id.label('parent_lot_id'), - _ancestor.c.id.label('ancestor_lot_id'), - LotDevice.device_id.label('device_parent_id'), - ]).select_from(_ancestor).select_from(lot_device_component).where(db.text(descendants)) + components = ( + db.select( + [ + Component.id.label('device_id'), + _desc.c.id.label('parent_lot_id'), + _ancestor.c.id.label('ancestor_lot_id'), + LotDevice.device_id.label('device_parent_id'), + ] + ) + .select_from(_ancestor) + .select_from(lot_device_component) + .where(db.text(descendants)) + ) __table__ = create_view('lot_device_descendants', devices.union(components)) class LotParent(db.Model): - i = f.index(Path.path, db.func.text2ltree(f.replace(exp.cast(Path.lot_id, TEXT), '-', '_'))) + i = f.index( + Path.path, db.func.text2ltree(f.replace(exp.cast(Path.lot_id, TEXT), '-', '_')) + ) __table__ = create_view( 'lot_parent', - db.select([ - Path.lot_id.label('child_id'), - exp.cast(f.replace(exp.cast(f.subltree(Path.path, i - 1, i), TEXT), '_', '-'), - UUID).label('parent_id') - ]).select_from(Path).where(i > 0), + db.select( + [ + Path.lot_id.label('child_id'), + exp.cast( + f.replace( + exp.cast(f.subltree(Path.path, i - 1, i), TEXT), '_', '-' + ), + UUID, + ).label('parent_id'), + ] + ) + .select_from(Path) + .where(i > 0), )