Changeset View
Changeset View
Standalone View
Standalone View
looper/models.py
| import datetime | import datetime | ||||
| import logging | import logging | ||||
| from typing import ( | from typing import ( | ||||
| Any, | Any, | ||||
| Dict, | Dict, | ||||
| Iterable, | Iterable, | ||||
| Mapping, | Mapping, | ||||
| Optional, | Optional, | ||||
| Sequence, | |||||
| Set, | Set, | ||||
| TYPE_CHECKING, | |||||
| Tuple, | Tuple, | ||||
| Type, | Type, | ||||
| Union, | Union, | ||||
| cast, | cast, | ||||
| ) | ) | ||||
| import dateutil.relativedelta | |||||
| from django.contrib.auth.models import User | from django.contrib.auth.models import User | ||||
| from django.db import models | from django.db import models | ||||
| import django.utils.timezone | from django.db.models import Model | ||||
| from django.urls import reverse | from django.urls import reverse | ||||
| from django.utils.html import format_html | from django.utils.html import format_html | ||||
| from django_countries.fields import CountryField | from django_countries.fields import CountryField | ||||
| import dateutil.relativedelta | |||||
| import django.utils.timezone | |||||
| from looper import admin_log, form_fields, gateways, model_mixins | from looper import admin_log, form_fields, gateways, model_mixins | ||||
| from looper.decorators import requires_status | from looper.decorators import requires_status | ||||
| from looper.money import Money, CurrencyMismatch | from looper.money import Money, CurrencyMismatch | ||||
| import looper.exceptions | import looper.exceptions | ||||
| import looper.signals | import looper.signals | ||||
| DEFAULT_CURRENCY = 'EUR' | DEFAULT_CURRENCY = 'EUR' | ||||
| CURRENCIES = ( | CURRENCIES = ( | ||||
| ('USD', 'USD'), | ('USD', 'USD'), | ||||
| ('EUR', 'EUR'), | ('EUR', 'EUR'), | ||||
| ) | ) | ||||
| log = logging.getLogger(__name__) | log = logging.getLogger(__name__) | ||||
| UpdateFieldsType = Optional[Set[str]] | UpdateFieldsType = Optional[Set[str]] | ||||
| """The type used by Django's save(update_field=...) parameter.""" | """The type used by Django's save(update_field=...) parameter.""" | ||||
| class CurrencyField(models.CharField): | class CurrencyField(models.CharField): | ||||
| def __init__(self, **kwargs): | def __init__(self, **kwargs: Any) -> None: | ||||
| kwargs.setdefault('max_length', 3) | kwargs.setdefault('max_length', 3) | ||||
| kwargs.setdefault('choices', CURRENCIES) | kwargs.setdefault('choices', CURRENCIES) | ||||
| kwargs.setdefault('default', DEFAULT_CURRENCY) | kwargs.setdefault('default', DEFAULT_CURRENCY) | ||||
| super().__init__(**kwargs) | super().__init__(**kwargs) | ||||
| class MoneyFieldDescriptor: | class MoneyFieldDescriptor: | ||||
| """Ensures that a MoneyField produces/accepts a Money instance. | """Ensures that a MoneyField produces/accepts a Money instance. | ||||
| A MoneyField accepts either an int (number of cents) or a Money instance. | A MoneyField accepts either an int (number of cents) or a Money instance. | ||||
| In the former case the currency is taken from the 'currency_field' | In the former case the currency is taken from the 'currency_field' | ||||
| attribute of the model instance. In the latter case that same attribute | attribute of the model instance. In the latter case that same attribute | ||||
| is used to ensure consistent currencies. | is used to ensure consistent currencies. | ||||
| """ | """ | ||||
| def __init__(self, field): | def __init__(self, field: 'MoneyField') -> None: | ||||
| self.field = field | self.field: 'MoneyField' = field # type: ignore | ||||
| def __get__(self, instance, cls=None) -> Optional[Money]: | def __get__(self, instance: object, cls: Optional[Type[object]] = None) -> Optional[Money]: | ||||
| if instance is None: | if instance is None: | ||||
| return None | return None | ||||
| try: | try: | ||||
| return instance.__dict__[self.field.name] | return cast(Optional[Money], instance.__dict__[self.field.name]) | ||||
| except KeyError as ex: | except KeyError as ex: | ||||
| raise AttributeError(f'No such attribute: {ex!s}') | raise AttributeError(f'No such attribute: {ex!s}') | ||||
| def __set__(self, instance, value: Union[None, int, Money]): | def __set__(self, instance: object, value: Union[None, int, Money]) -> None: | ||||
| try: | try: | ||||
| instance_currency = instance.__dict__[self.field.currency_field] | instance_currency = instance.__dict__[self.field.currency_field] | ||||
| except KeyError: | except KeyError: | ||||
| raise AttributeError(f'{instance!r} has no property {self.field.currency_field}') | raise AttributeError(f'{instance!r} has no property {self.field.currency_field}') | ||||
| if value is None: | if value is None: | ||||
| value_to_set = None | value_to_set = None | ||||
| elif isinstance(value, int): | elif isinstance(value, int): | ||||
| Show All 22 Lines | class MoneyField(models.IntegerField): | ||||
| To use this field, the field referenced by the 'currency_field' | To use this field, the field referenced by the 'currency_field' | ||||
| parameter must exist already. That is, it must be declared on the | parameter must exist already. That is, it must be declared on the | ||||
| model class before the MoneyField() instance that references it. | model class before the MoneyField() instance that references it. | ||||
| To keep mypy happy, add a type declaration 'fieldname: Money' when | To keep mypy happy, add a type declaration 'fieldname: Money' when | ||||
| using this model field. | using this model field. | ||||
| """ | """ | ||||
| def __init__(self, *args, currency_field='currency', **kwargs): | def __init__(self, *args: Any, currency_field: str = 'currency', **kwargs: Any) -> None: | ||||
| self.currency_field = currency_field | self.currency_field: str = currency_field | ||||
| super().__init__(*args, **kwargs) | super().__init__(*args, **kwargs) | ||||
| def deconstruct(self): | def deconstruct(self) -> Any: | ||||
| """Return parameters for __init__ to recreate this field.""" | """Return parameters for __init__ to recreate this field.""" | ||||
| name, path, args, kwargs = super().deconstruct() | name, path, args, kwargs = super().deconstruct() | ||||
| kwargs['currency_field'] = self.currency_field | kwargs['currency_field'] = self.currency_field | ||||
| return name, path, args, kwargs | return name, path, args, kwargs | ||||
| def get_prep_value(self, value: Union[None, int, Money]) -> Optional[int]: | def get_prep_value(self, value: Union[None, int, Money]) -> Optional[int]: | ||||
| """Return field's value prepared for saving into a database.""" | """Return field's value prepared for saving into a database.""" | ||||
| if value is None: | if value is None: | ||||
| return None | return None | ||||
| if isinstance(value, int): | if isinstance(value, int): | ||||
| # The default value for an amount can be given as an integer. | # The default value for an amount can be given as an integer. | ||||
| return value | return value | ||||
| assert isinstance(value, Money) | assert isinstance(value, Money) | ||||
| return value.cents | return value.cents | ||||
| def contribute_to_class(self, cls, name, **kwargs): | def contribute_to_class(self, cls: Type['Model'], name: str, **kwargs: Any): | ||||
| """Ensure our descriptor is called when getting/setting the attribute.""" | """Ensure our descriptor is called when getting/setting the attribute.""" | ||||
| super().contribute_to_class(cls, name, **kwargs) | super().contribute_to_class(cls, name, **kwargs) | ||||
| setattr(cls, self.name, MoneyFieldDescriptor(self)) | setattr(cls, self.name, MoneyFieldDescriptor(self)) | ||||
| def formfield(self, **kwargs): | def formfield(self, **kwargs: Any) -> Any: | ||||
| """Default to MoneyFormField if form_class is not specified.""" | """Default to MoneyFormField if form_class is not specified.""" | ||||
| return super().formfield( | return super().formfield( | ||||
| **{ | **{ | ||||
| 'form_class': form_fields.MoneyFormField, | 'form_class': form_fields.MoneyFormField, | ||||
| **kwargs, | **kwargs, | ||||
| 'widget': form_fields.MoneyInput, | 'widget': form_fields.MoneyInput, | ||||
| } | } | ||||
| ) | ) | ||||
| def to_python(self, value: Union[None, int, str, Money]) -> Optional[int]: | def to_python(self, value: Union[None, int, str, Money]) -> Optional[int]: | ||||
| """The result of this function feeds the MoneyFieldDescriptor. | """The result of this function feeds the MoneyFieldDescriptor. | ||||
| I think. | I think. | ||||
| Note that this function is also used when deserialising values from | Note that this function is also used when deserialising values from | ||||
| JSON, in which case the Money value was serialised to the string | JSON, in which case the Money value was serialised to the string | ||||
| '{currency}\u00A0{amount}'. | '{currency}\u00A0{amount}'. | ||||
| """ | """ | ||||
| if isinstance(value, str): | if isinstance(value, str): | ||||
| return Money.from_str(value).cents | return Money.from_str(value).cents | ||||
| if isinstance(value, Money): | if isinstance(value, Money): | ||||
| return value.cents | return value.cents | ||||
| return super().to_python(value) | return cast(Optional[int], super().to_python(value)) | ||||
| def get_attname_column(self): | def get_attname_column(self) -> Tuple[str, str]: | ||||
| """Add '_in_cents' to the database column name.""" | """Add '_in_cents' to the database column name.""" | ||||
| attname, column = super().get_attname_column() | attname, column = super().get_attname_column() # type: ignore | ||||
| return attname, f'{column}_in_cents' | return attname, f'{column}_in_cents' | ||||
| class Customer(models.Model): | class Customer(models.Model): | ||||
| """Extended properties for the existing user. | """Extended properties for the existing user. | ||||
| It references: | It references: | ||||
| - addresses | - addresses | ||||
| ▲ Show 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | def gateway_customer_id_get_or_create(self, gateway: 'Gateway') -> str: | ||||
| except GatewayCustomerId.DoesNotExist: | except GatewayCustomerId.DoesNotExist: | ||||
| gateway_customer_id = gateway.provider.customer_create() | gateway_customer_id = gateway.provider.customer_create() | ||||
| gateway_customer = self.user.gatewaycustomerid_set.create( | gateway_customer = self.user.gatewaycustomerid_set.create( | ||||
| gateway_customer_id=gateway_customer_id, gateway=gateway | gateway_customer_id=gateway_customer_id, gateway=gateway | ||||
| ) | ) | ||||
| return gateway_customer.gateway_customer_id | return gateway_customer.gateway_customer_id | ||||
| def payment_method_add( | def payment_method_add( | ||||
| self, payment_method_nonce: str, gateway: 'Gateway', **payment_options | self, payment_method_nonce: str, gateway: 'Gateway', **payment_options: Any | ||||
| ) -> 'PaymentMethod': | ) -> 'PaymentMethod': | ||||
| """Add a payment method for the customer on the gateway. | """Add a payment method for the customer on the gateway. | ||||
| :raise: looper.exceptions.GatewayError if the payment method could not be added. | :raise: looper.exceptions.GatewayError if the payment method could not be added. | ||||
| """ | """ | ||||
| # Check if the customer already exists on the provided gateway. | # Check if the customer already exists on the provided gateway. | ||||
| gateway_customer_id = self.gateway_customer_id_get_or_create(gateway) | gateway_customer_id = self.gateway_customer_id_get_or_create(gateway) | ||||
| pm_info = gateway.provider.payment_method_create( | pm_info = gateway.provider.payment_method_create( | ||||
| payment_method_nonce, gateway_customer_id, **payment_options | payment_method_nonce, gateway_customer_id, **payment_options | ||||
| ) | ) | ||||
| # I'd rather just query and handle duplicates in the django.db.IntegrityError exception | # I'd rather just query and handle duplicates in the django.db.IntegrityError exception | ||||
| # handler, but Django doesn't allow me to do queries there (raises a | # handler, but Django doesn't allow me to do queries there (raises a | ||||
| # TransactionManagementError). | # TransactionManagementError). | ||||
| paymeth: PaymentMethod = self.user.paymentmethod_set.filter( | paymeth: Optional[PaymentMethod] = self.user.paymentmethod_set.filter( | ||||
| gateway=gateway, token=pm_info.token | gateway=gateway, token=pm_info.token | ||||
| ).first() | ).first() | ||||
| if not paymeth: | if not paymeth: | ||||
| self._log.debug('Storing new payment method for customer pk=%d', self.pk) | self._log.debug('Storing new payment method for customer pk=%d', self.pk) | ||||
| method_type = pm_info.type_for_database() | method_type = pm_info.type_for_database() | ||||
| assert isinstance(method_type, str), f'expected str, not {method_type!r}' | assert isinstance(method_type, str), f'expected str, not {method_type!r}' | ||||
| recog_name = pm_info.recognisable_name() | recog_name = pm_info.recognisable_name() | ||||
| return self.user.paymentmethod_set.create( | return self.user.paymentmethod_set.create( | ||||
| gateway=gateway, | gateway=gateway, | ||||
| token=pm_info.token, | token=pm_info.token, | ||||
| method_type=method_type, | method_type=method_type, | ||||
| recognisable_name=recog_name, | recognisable_name=recog_name, | ||||
| ) | ) | ||||
| if paymeth.is_deleted: | if paymeth.is_deleted: | ||||
| # We're re-adding a previously-deleted payment method, so just restore it. | # We're re-adding a previously-deleted payment method, so just restore it. | ||||
| paymeth.undelete() | paymeth.undelete() | ||||
| self._log.debug('Updating existing payment method for customer pk=%d', self.pk) | self._log.debug('Updating existing payment method for customer pk=%d', self.pk) | ||||
| paymeth.recognisable_name = pm_info.recognisable_name() | paymeth.recognisable_name = pm_info.recognisable_name() | ||||
| paymeth.method_type = pm_info.type_for_database() | paymeth.method_type = pm_info.type_for_database() | ||||
| paymeth.save(update_fields={'recognisable_name', 'method_type'}) | paymeth.save(update_fields={'recognisable_name', 'method_type'}) | ||||
| return paymeth | return paymeth | ||||
| def __str__(self): | def __str__(self) -> str: | ||||
| if not self.full_name: | if not self.full_name: | ||||
| return f'Customer {self.pk}' | return f'Customer {self.pk}' | ||||
| return self.full_name | return self.full_name | ||||
| class Gateway(models.Model): | class Gateway(models.Model): | ||||
| """Payment gateway data. | """Payment gateway data. | ||||
| Show All 10 Lines | class Gateway(models.Model): | ||||
| form_description = models.TextField( | form_description = models.TextField( | ||||
| blank=True, | blank=True, | ||||
| help_text='Shown in payment forms when this gateway is selected. This is interpreted as ' | help_text='Shown in payment forms when this gateway is selected. This is interpreted as ' | ||||
| 'HTML, so be careful and HTML-escape what needs to be HTML-escaped.', | 'HTML, so be careful and HTML-escape what needs to be HTML-escaped.', | ||||
| ) | ) | ||||
| log = log.getChild('Gateway') | log = log.getChild('Gateway') | ||||
| @classmethod | @classmethod | ||||
| def default(cls): | def default(cls) -> 'Gateway': | ||||
| return cls.objects.get(is_default=True) | return cls.objects.get(is_default=True) | ||||
| @property | @property | ||||
| def provider(self) -> gateways.AbstractPaymentGateway: | def provider(self) -> gateways.AbstractPaymentGateway: | ||||
| return gateways.Registry.instance_for(self.name) | return gateways.Registry.instance_for(self.name) | ||||
| def save(self, *args, **kwargs): | def save(self, *args: Any, **kwargs: Any) -> None: | ||||
| """Ensure that only one Gateway is the default one.""" | """Ensure that only one Gateway is the default one.""" | ||||
| if self.is_default: | if self.is_default: | ||||
| for old_default in Gateway.objects.filter(is_default=True): | for old_default in Gateway.objects.filter(is_default=True): | ||||
| self.log.info('Switching default gateway from %r to %r', old_default, self) | self.log.info('Switching default gateway from %r to %r', old_default, self) | ||||
| old_default.is_default = False | old_default.is_default = False | ||||
| old_default.save(update_fields={'is_default'}) | old_default.save(update_fields={'is_default'}) | ||||
| super().save(*args, **kwargs) | super().save(*args, **kwargs) | ||||
| def __str__(self): | def __str__(self) -> str: | ||||
| return self.name | return self.name | ||||
| class GatewayCustomerId(models.Model): | class GatewayCustomerId(models.Model): | ||||
| """Identity of the Customer on the payment Gatweay. | """Identity of the Customer on the payment Gatweay. | ||||
| Stores a reference to a customer, to fetch payment methods on the Gateway. | Stores a reference to a customer, to fetch payment methods on the Gateway. | ||||
| We could do without this table, specifying to the Gateway our Customer id, | We could do without this table, specifying to the Gateway our Customer id, | ||||
| ▲ Show 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | def gateway_properties(self) -> Optional[looper.gateways.PaymentMethodInfo]: | ||||
| type_for_db = pm_info.type_for_database() | type_for_db = pm_info.type_for_database() | ||||
| if type_for_db != self.method_type: | if type_for_db != self.method_type: | ||||
| log.debug('Updating method_type of payment method pk=%d', self.pk) | log.debug('Updating method_type of payment method pk=%d', self.pk) | ||||
| self.method_type = type_for_db | self.method_type = type_for_db | ||||
| self.save(update_fields={'method_type'}) | self.save(update_fields={'method_type'}) | ||||
| return pm_info | return pm_info | ||||
| def delete(self, *args, using=None, **kwargs): | def delete(self, *args: Any, using: Any = None, **kwargs: Any) -> Tuple[int, Dict[str, int]]: | ||||
| """Soft-delete instead of really deleting. | """Soft-delete instead of really deleting. | ||||
| This allows us to keep references to this payment method. | This allows us to keep references to this payment method. | ||||
| Note that deleting does really delete the payment method at the | Note that deleting does really delete the payment method at the | ||||
| payment gateway; the record we keep is for our own history only. | payment gateway; the record we keep is for our own history only. | ||||
| """ | """ | ||||
| # TODO check if the payment method is connected to a pending subscription or an order | # TODO check if the payment method is connected to a pending subscription or an order | ||||
| # TODO(Sybren): handle exceptions communicating with the gateway. | # TODO(Sybren): handle exceptions communicating with the gateway. | ||||
| self.gateway.provider.payment_method_delete(self.token) | self.gateway.provider.payment_method_delete(self.token) | ||||
| log.debug('Soft-deleting payment method pk=%d', self.pk) | log.debug('Soft-deleting payment method pk=%d', self.pk) | ||||
| self.is_deleted = True | self.is_deleted = True | ||||
| self.save(using=using, update_fields={'is_deleted'}) | self.save(using=using, update_fields={'is_deleted'}) | ||||
| def undelete(self): | return None # type: ignore | ||||
| def undelete(self) -> None: | |||||
| """Un-soft-deletes the instance. | """Un-soft-deletes the instance. | ||||
| Note that this assumes the user has re-added the same payment method | Note that this assumes the user has re-added the same payment method | ||||
| at the payment gateway, and that this resulted in the same payment | at the payment gateway, and that this resulted in the same payment | ||||
| token. Otherwise it should just be a new PaymentMethod instance. | token. Otherwise it should just be a new PaymentMethod instance. | ||||
| """ | """ | ||||
| log.debug('Un-deleting payment method pk=%d', self.pk) | log.debug('Un-deleting payment method pk=%d', self.pk) | ||||
| self.is_deleted = False | self.is_deleted = False | ||||
| self.save(update_fields={'is_deleted'}) | self.save(update_fields={'is_deleted'}) | ||||
| def __str__(self): | def __str__(self) -> str: | ||||
| as_str = f'{self.gateway.name} – {self.recognisable_name or self.method_type}' | as_str = f'{self.gateway.name} – {self.recognisable_name or self.method_type}' | ||||
| if self.is_deleted: | if self.is_deleted: | ||||
| return f'{as_str} (deleted)' | return f'{as_str} (deleted)' | ||||
| return as_str | return as_str | ||||
| class Address(models.Model): | class Address(models.Model): | ||||
| # These are the fields used to create forms, and publicly displayed | # These are the fields used to create forms, and publicly displayed | ||||
| Show All 30 Lines | class Address(models.Model): | ||||
| ) | ) | ||||
| country = CountryField(blank=True) | country = CountryField(blank=True) | ||||
| @property | @property | ||||
| def public_values(self) -> Iterable[str]: | def public_values(self) -> Iterable[str]: | ||||
| return (getattr(self, f) for f in self.PUBLIC_FIELDS) | return (getattr(self, f) for f in self.PUBLIC_FIELDS) | ||||
| @property | @property | ||||
| def as_dict(self): | def as_dict(self) -> Dict[str, object]: | ||||
| return {f: getattr(self, f) for f in self.PUBLIC_FIELDS} | return {f: getattr(self, f) for f in self.PUBLIC_FIELDS} | ||||
| def __str__(self) -> str: | def __str__(self) -> str: | ||||
| return f'{self.category} address of {self.user}'.capitalize() | return f'{self.category} address of {self.user}'.capitalize() | ||||
| def as_text(self) -> str: | def as_text(self) -> str: | ||||
| from django_countries.fields import Country | from django_countries.fields import Country | ||||
| def to_str(element: object) -> str: | def to_str(element: object) -> str: | ||||
| if isinstance(element, Country): | if isinstance(element, Country): | ||||
| return cast(str, element.name) | return cast(str, element.name) | ||||
| return str(element) | return str(element) | ||||
| return '\n'.join(to_str(elem) for elem in self.public_values if elem) | return '\n'.join(to_str(elem) for elem in self.public_values if elem) | ||||
| class Product(model_mixins.CreatedUpdatedMixin, models.Model): | class Product(model_mixins.CreatedUpdatedMixin, models.Model): | ||||
| """Can be a good or a service, like 'Development Fund'.""" | """Can be a good or a service, like 'Development Fund'.""" | ||||
| name = models.CharField(max_length=200) | name = models.CharField(max_length=200) | ||||
| def __str__(self): | def __str__(self) -> str: | ||||
| return self.name | return self.name | ||||
| class Plan(models.Model): | class Plan(models.Model): | ||||
| """Acts as a variation of the product, like 'Gold' for 'Development Fund'.""" | """Acts as a variation of the product, like 'Gold' for 'Development Fund'.""" | ||||
| product = models.ForeignKey(Product, on_delete=models.CASCADE) | product = models.ForeignKey(Product, on_delete=models.CASCADE) | ||||
| name = models.CharField(max_length=255) # User-facing name | name = models.CharField(max_length=255) # User-facing name | ||||
| description = models.CharField(max_length=255) | description = models.CharField(max_length=255) | ||||
| is_active = models.BooleanField(default=True) | is_active = models.BooleanField(default=True) | ||||
| def __str__(self): | def __str__(self) -> str: | ||||
| return self.name | return self.name | ||||
| def variation_for_currency(self, currency: str) -> Optional['PlanVariation']: | def variation_for_currency(self, currency: str) -> Optional['PlanVariation']: | ||||
| """Determine the default variation for the given currency. | """Determine the default variation for the given currency. | ||||
| Returns None when there is no plan variation for the currency at all. | Returns None when there is no plan variation for the currency at all. | ||||
| """ | """ | ||||
| default_variation = ( | default_variation = ( | ||||
| self.variations.active() | self.variations.active() | ||||
| .filter(currency=currency) | .filter(currency=currency) | ||||
| .order_by('-is_default_for_currency', '-price') | .order_by('-is_default_for_currency', '-price') | ||||
| .first() | .first() | ||||
| ) | ) | ||||
| return default_variation | return default_variation | ||||
| class PlanVariationManager(models.Manager): | class PlanVariationManager(models.Manager): | ||||
| def active(self) -> models.QuerySet: | def active(self) -> 'models.QuerySet[PlanVariation]': | ||||
| """Return QuerySet of only the active plan variations.""" | """Return QuerySet of only the active plan variations.""" | ||||
| return self.filter(is_active=True) | return self.filter(is_active=True) | ||||
| class PlanVariation(models.Model): | class PlanVariation(models.Model): | ||||
| DEFAULT_INTERVAL_UNIT = 'month' | DEFAULT_INTERVAL_UNIT = 'month' | ||||
| INTERVAL_UNITS = ( | INTERVAL_UNITS = ( | ||||
| ('day', 'Day'), | ('day', 'Day'), | ||||
| ('week', 'Week'), | ('week', 'Week'), | ||||
| ('month', 'Month'), | ('month', 'Month'), | ||||
| ('year', 'Year'), | ('year', 'Year'), | ||||
| ) | ) | ||||
| DEFAULT_COLLECTION_METHOD = 'automatic' | DEFAULT_COLLECTION_METHOD = 'automatic' | ||||
| COLLECTION_METHODS = ( | COLLECTION_METHODS = ( | ||||
| ('automatic', 'Automatic'), | ('automatic', 'Automatic'), | ||||
| ('manual', 'Manual'), | ('manual', 'Manual'), | ||||
| ) | ) | ||||
| plan = models.ForeignKey(Plan, on_delete=models.CASCADE, related_name='variations') | plan = models.ForeignKey(Plan, on_delete=models.CASCADE, related_name='variations') | ||||
| currency = CurrencyField() | currency: str = CurrencyField() | ||||
| price: Money = MoneyField(help_text='Including tax.') | price: Money = MoneyField(help_text='Including tax.') | ||||
| interval_unit = models.CharField( | interval_unit = models.CharField( | ||||
| choices=INTERVAL_UNITS, default=DEFAULT_INTERVAL_UNIT, max_length=50 | choices=INTERVAL_UNITS, default=DEFAULT_INTERVAL_UNIT, max_length=50 | ||||
| ) | ) | ||||
| interval_length = models.PositiveIntegerField(default=1) | interval_length = models.PositiveIntegerField(default=1) | ||||
| collection_method = models.CharField( | collection_method = models.CharField( | ||||
| choices=COLLECTION_METHODS, default=DEFAULT_COLLECTION_METHOD, max_length=20 | choices=COLLECTION_METHODS, default=DEFAULT_COLLECTION_METHOD, max_length=20 | ||||
| ) | ) | ||||
| is_active = models.BooleanField(default=True) | is_active = models.BooleanField(default=True) | ||||
| is_default_for_currency = models.BooleanField(default=False) | is_default_for_currency = models.BooleanField(default=False) | ||||
| objects = PlanVariationManager() | objects: PlanVariationManager = PlanVariationManager() | ||||
| class Meta: | class Meta: | ||||
| unique_together = [ | unique_together = [ | ||||
| ('plan', 'currency', 'interval_unit', 'interval_length', 'collection_method') | ('plan', 'currency', 'interval_unit', 'interval_length', 'collection_method') | ||||
| ] | ] | ||||
| ordering = ('-is_active', 'currency', '-price', 'collection_method') | ordering = ('-is_active', 'currency', '-price', 'collection_method') | ||||
| @property | @property | ||||
| def price_per_month(self) -> Money: | def price_per_month(self) -> Money: | ||||
| """Returns the rounded price per month for this plan variation. | """Returns the rounded price per month for this plan variation. | ||||
| Note that this is a lossy operation, as it rounds down to entire cents. | Note that this is a lossy operation, as it rounds down to entire cents. | ||||
| """ | """ | ||||
| months = { | months = { | ||||
| 'day': 12 / 365, # Approximate average nr of months in a day. | 'day': 12 / 365, # Approximate average nr of months in a day. | ||||
| 'week': 12 / 52, # Approximate average nr of months in a week. | 'week': 12 / 52, # Approximate average nr of months in a week. | ||||
| 'month': 1, | 'month': 1, | ||||
| 'year': 12, | 'year': 12, | ||||
| } | } | ||||
| months_per_interval = months[self.interval_unit] * self.interval_length | months_per_interval = months[self.interval_unit] * self.interval_length | ||||
| return self.price // months_per_interval | return self.price // months_per_interval | ||||
| def __str__(self): | def __str__(self) -> str: | ||||
| return f"{self.plan.name} - {self.currency} - {self.interval_length} {self.interval_unit}" | return f"{self.plan.name} - {self.currency} - {self.interval_length} {self.interval_unit}" | ||||
| def __repr__(self) -> str: | def __repr__(self) -> str: | ||||
| return ( | return ( | ||||
| f'PlanVariation(pk={self.pk}, plan.name={self.plan.name!r}, price={self.price!r}, ' | f'PlanVariation(pk={self.pk}, plan.name={self.plan.name!r}, price={self.price!r}, ' | ||||
| f'interval_unit={self.interval_unit!r}, interval_length={self.interval_length}, ' | f'interval_unit={self.interval_unit!r}, interval_length={self.interval_length}, ' | ||||
| f'collection_method={self.collection_method})' | f'collection_method={self.collection_method})' | ||||
| ) | ) | ||||
| def save(self, *args, **kwargs): | def save(self, *args: Any, **kwargs: Any) -> None: | ||||
| if self.is_default_for_currency: | if self.is_default_for_currency: | ||||
| # Make sure there is only one default. | # Make sure there is only one default. | ||||
| self.__class__.objects.filter(plan=self.plan, currency=self.currency).exclude( | self.__class__.objects.filter(plan=self.plan, currency=self.currency).exclude( | ||||
| pk=self.pk | pk=self.pk | ||||
| ).update(is_default_for_currency=False) | ).update(is_default_for_currency=False) | ||||
| super().save(*args, **kwargs) | super().save(*args, **kwargs) | ||||
| Show All 13 Lines | COLLECTION_METHODS = ( | ||||
| ('managed', 'Managed (manual by staff)'), | ('managed', 'Managed (manual by staff)'), | ||||
| ) | ) | ||||
| user = models.ForeignKey(User, on_delete=models.CASCADE) | user = models.ForeignKey(User, on_delete=models.CASCADE) | ||||
| payment_method = models.ForeignKey( | payment_method = models.ForeignKey( | ||||
| PaymentMethod, null=True, blank=True, on_delete=models.CASCADE | PaymentMethod, null=True, blank=True, on_delete=models.CASCADE | ||||
| ) | ) | ||||
| currency = CurrencyField() | currency: str = CurrencyField() | ||||
| price: Money = MoneyField(default=0, help_text='Including tax.') | price: Money = MoneyField(default=0, help_text='Including tax.') | ||||
| collection_method = models.CharField( | collection_method = models.CharField( | ||||
| choices=COLLECTION_METHODS, default=DEFAULT_COLLECTION_METHOD, max_length=20 | choices=COLLECTION_METHODS, default=DEFAULT_COLLECTION_METHOD, max_length=20 | ||||
| ) | ) | ||||
| tax: Money = MoneyField(blank=True, default=0) | tax: Money = MoneyField(blank=True, default=0) | ||||
| tax_type = models.CharField(max_length=20, blank=True, default='') | tax_type = models.CharField(max_length=20, blank=True, default='') | ||||
| tax_region = CountryField(blank=True, default='') | tax_region = CountryField(blank=True, default='') | ||||
| @classmethod | @classmethod | ||||
| def field_names(cls) -> Iterable[str]: | def field_names(cls) -> Iterable[str]: | ||||
| return (f.name for f in cls._meta.fields) | return (f.name for f in cls._meta.fields) | ||||
| class SubscriptionManager(models.Manager): | class SubscriptionManager(models.Manager): | ||||
| def payable(self) -> models.QuerySet: | def payable(self) -> 'models.QuerySet[Subscription]': | ||||
| """Return subscriptions that can still be paid for.""" | """Return subscriptions that can still be paid for.""" | ||||
| return self.filter(status__in={'active', 'on-hold'}) | return self.filter(status__in={'active', 'on-hold'}) | ||||
| # Ignoring type because of the inner Meta class of the two mix-ins. | # Ignoring type because of the inner Meta class of the two mix-ins. | ||||
| class Subscription( # type: ignore | class Subscription( # type: ignore | ||||
| model_mixins.RecordModifcationMixin, | model_mixins.RecordModifcationMixin, | ||||
| model_mixins.CreatedUpdatedMixin, | model_mixins.CreatedUpdatedMixin, | ||||
| ▲ Show 20 Lines • Show All 72 Lines • ▼ Show 20 Lines | ): | ||||
| last_notification = models.DateTimeField( | last_notification = models.DateTimeField( | ||||
| null=True, | null=True, | ||||
| blank=True, | blank=True, | ||||
| help_text='When Ton was last sent a notification that this subscription passed its next ' | help_text='When Ton was last sent a notification that this subscription passed its next ' | ||||
| 'payment date. Only used for managed subscriptions.', | 'payment date. Only used for managed subscriptions.', | ||||
| ) | ) | ||||
| objects = SubscriptionManager() | objects: SubscriptionManager = SubscriptionManager() | ||||
| @property | @property | ||||
| def monthly_rounded_price(self) -> Money: | def monthly_rounded_price(self) -> Money: | ||||
| """Compute the price per month, rounded to entire cents. | """Compute the price per month, rounded to entire cents. | ||||
| Note that using montly_rounded_price() * 12 to get the | Note that using montly_rounded_price() * 12 to get the | ||||
| yearly price is wrong, due to rounding errors. If you want | yearly price is wrong, due to rounding errors. If you want | ||||
| to know that, just implement yearly_price() or something. | to know that, just implement yearly_price() or something. | ||||
| """ | """ | ||||
| units_per_month = self._UNITS_PER_MONTH[self.interval_unit] | units_per_month = self._UNITS_PER_MONTH[self.interval_unit] | ||||
| price_in_cents = self.price.cents * units_per_month / self.interval_length | price_in_cents = self.price.cents * units_per_month / self.interval_length | ||||
| return Money(currency=self.currency, cents=int(round(price_in_cents))) | return Money(currency=self.currency, cents=int(round(price_in_cents))) | ||||
| @property | @property | ||||
| def interval(self) -> dateutil.relativedelta.relativedelta: | def interval(self) -> dateutil.relativedelta.relativedelta: | ||||
| """Return the renewal interval as relativedelta. | """Return the renewal interval as relativedelta. | ||||
| The relativedelta can be added to regular `datetime.datetime` objects. | The relativedelta can be added to regular `datetime.datetime` objects. | ||||
| """ | """ | ||||
| kwargs = {f"{self.interval_unit}s": self.interval_length} | kwargs = {f"{self.interval_unit}s": self.interval_length} | ||||
| return dateutil.relativedelta.relativedelta(**kwargs) | return dateutil.relativedelta.relativedelta(**kwargs) # type: ignore | ||||
| @property | @property | ||||
| def can_be_activated(self) -> bool: | def can_be_activated(self) -> bool: | ||||
| """Whether a status transition to 'active' is allowed.""" | """Whether a status transition to 'active' is allowed.""" | ||||
| return self.may_transition_to('active') | return self.may_transition_to('active') | ||||
| @property | @property | ||||
| def is_active(self) -> bool: | def is_active(self) -> bool: | ||||
| ▲ Show 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | def latest_order(self) -> Optional['Order']: | ||||
| subscriptions for which collection_method='managed'; those never have | subscriptions for which collection_method='managed'; those never have | ||||
| any orders attached. | any orders attached. | ||||
| """ | """ | ||||
| try: | try: | ||||
| return self.order_set.latest() | return self.order_set.latest() | ||||
| except Order.DoesNotExist: | except Order.DoesNotExist: | ||||
| return None | return None | ||||
| def save(self, *args, update_fields: UpdateFieldsType = None, **kwargs) -> None: | def save(self, *args: Any, update_fields: UpdateFieldsType = None, **kwargs: Any) -> None: | ||||
| """Act on state changes.""" | """Act on state changes.""" | ||||
| was_changed, old_state = self.pre_save_record() | was_changed, old_state = self.pre_save_record() | ||||
| if was_changed: | if was_changed: | ||||
| self._handle_status_change_pre_save(old_state, update_fields) | self._handle_status_change_pre_save(old_state, update_fields) | ||||
| super().save(*args, update_fields=update_fields, **kwargs) | super().save(*args, update_fields=update_fields, **kwargs) | ||||
| ▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | def _handle_user_changed(self, old_user_id: Optional[int]) -> None: | ||||
| order.save(update_fields={'user_id', 'payment_method_id'}) | order.save(update_fields={'user_id', 'payment_method_id'}) | ||||
| for transaction in order.transaction_set.all(): | for transaction in order.transaction_set.all(): | ||||
| transaction.user_id = new_user_id | transaction.user_id = new_user_id | ||||
| transaction.save(update_fields={'user_id'}) | transaction.save(update_fields={'user_id'}) | ||||
| self.payment_method = None | self.payment_method = None | ||||
| self.save(update_fields={'payment_method'}) | self.save(update_fields={'payment_method'}) | ||||
| def _on_cancelled(self, update_fields: UpdateFieldsType): | def _on_cancelled(self, update_fields: UpdateFieldsType) -> None: | ||||
| """Called when the subscription transitions to the 'cancelled' status.""" | """Called when the subscription transitions to the 'cancelled' status.""" | ||||
| self.log.info('Subscription %d was cancelled, setting cancelled_at field too.', self.pk) | self.log.info('Subscription %d was cancelled, setting cancelled_at field too.', self.pk) | ||||
| self.cancelled_at = django.utils.timezone.now() | self.cancelled_at = django.utils.timezone.now() | ||||
| if update_fields is not None: | if update_fields is not None: | ||||
| update_fields.add('cancelled_at') | update_fields.add('cancelled_at') | ||||
| order = self.latest_order() | order = self.latest_order() | ||||
| Show All 30 Lines | def _on_activation_post_save(self, old_status: str) -> None: | ||||
| self.log.debug('Subscription pk=%d was activated', self.pk) | self.log.debug('Subscription pk=%d was activated', self.pk) | ||||
| looper.signals.subscription_activated.send(sender=self, old_status=old_status) | looper.signals.subscription_activated.send(sender=self, old_status=old_status) | ||||
| def _on_deactivation_post_save(self, old_status: str) -> None: | def _on_deactivation_post_save(self, old_status: str) -> None: | ||||
| """Called whenever the subscription becomes inactive.""" | """Called whenever the subscription becomes inactive.""" | ||||
| self.log.debug('Subscription pk=%d was deactivated', self.pk) | self.log.debug('Subscription pk=%d was deactivated', self.pk) | ||||
| looper.signals.subscription_deactivated.send(sender=self, old_status=old_status) | looper.signals.subscription_deactivated.send(sender=self, old_status=old_status) | ||||
| def generate_order(self, *, save=True) -> 'Order': | def generate_order(self, *, save: bool = True) -> 'Order': | ||||
| """Generate an order for the current subscription. | """Generate an order for the current subscription. | ||||
| :param save: save the generated order to the database. | :param save: save the generated order to the database. | ||||
| """ | """ | ||||
| # Dynamically get the fields to copy. | # Dynamically get the fields to copy. | ||||
| field_copies = {name: getattr(self, name) for name in SharedFields.field_names()} | field_copies = {name: getattr(self, name) for name in SharedFields.field_names()} | ||||
| customer: Customer = self.user.customer | customer: Customer = self.user.customer | ||||
| order = Order( | order = Order( | ||||
| subscription=self, | subscription=self, | ||||
| email=customer.billing_email, | email=customer.billing_email, | ||||
| billing_address=customer.billing_address.as_text(), | billing_address=customer.billing_address.as_text(), | ||||
| status='created', | status='created', | ||||
| name=f'{self.plan.product.name} / {self.plan.name}', | name=f'{self.plan.product.name} / {self.plan.name}', | ||||
| **field_copies, | **field_copies, | ||||
| ) | ) | ||||
| if save: | if save: | ||||
| order.save() | order.save() | ||||
| return order | return order | ||||
| def set_next_payment_after_activation(self): | def set_next_payment_after_activation(self) -> None: | ||||
| """Conditionally set `next_payment` to a suitable value and save to database.""" | """Conditionally set `next_payment` to a suitable value and save to database.""" | ||||
| update_fields: UpdateFieldsType = set() | update_fields: Set[str] = set() | ||||
| self._set_next_payment_after_activation(update_fields) | self._set_next_payment_after_activation(update_fields) | ||||
| if update_fields: | if update_fields: | ||||
| self.save(update_fields=update_fields) | self.save(update_fields=update_fields) | ||||
| def _set_next_payment_after_activation(self, update_fields: UpdateFieldsType): | def _set_next_payment_after_activation(self, update_fields: Optional[Set[str]]) -> None: | ||||
| """Conditionally set `next_payment` to a suitable value.""" | """Conditionally set `next_payment` to a suitable value.""" | ||||
| now = django.utils.timezone.now() | now = django.utils.timezone.now() | ||||
| if self.next_payment is not None and self.next_payment > now: | if self.next_payment is not None and self.next_payment > now: | ||||
| self.log.debug( | self.log.debug( | ||||
| 'Subscription %r has future next_payment, not bumping after activation.', self.pk | 'Subscription %r has future next_payment, not bumping after activation.', self.pk | ||||
| ) | ) | ||||
| return | return | ||||
| self.bump_next_payment() | self.bump_next_payment() | ||||
| if update_fields is not None: | if update_fields is not None: | ||||
| update_fields.add('next_payment') | update_fields.add('next_payment') | ||||
| def bump_next_payment(self): | def bump_next_payment(self) -> None: | ||||
| """Bump the 'next_payment' field to one interval from now. | """Bump the 'next_payment' field to one interval from now. | ||||
| Does NOT save this subscription; that's up to the caller. | Does NOT save this subscription; that's up to the caller. | ||||
| """ | """ | ||||
| renewal = django.utils.timezone.now() | renewal = django.utils.timezone.now() | ||||
| self.next_payment = renewal + self.interval | self.next_payment = renewal + self.interval | ||||
| self.log.info('Bumped subscription %r next_payment to %s', self.pk, self.next_payment) | self.log.info('Bumped subscription %r next_payment to %s', self.pk, self.next_payment) | ||||
| def __str__(self): | def __str__(self) -> str: | ||||
| try: | try: | ||||
| plan = self.plan.name | plan = self.plan.name | ||||
| except Plan.DoesNotExist: | except Plan.DoesNotExist: | ||||
| plan = '(no plan)' | plan = '(no plan)' | ||||
| try: | try: | ||||
| customer = self.user.customer.full_name | customer = self.user.customer.full_name | ||||
| except Customer.DoesNotExist: | except Customer.DoesNotExist: | ||||
| customer = '(no customer)' | customer = '(no customer)' | ||||
| return f'{self.id} - {customer} - {plan}' | return f'{self.id} - {customer} - {plan}' | ||||
| def attach_log_entry(self, message: str): | def attach_log_entry(self, message: str) -> None: | ||||
| """Attach an admin history log entry.""" | """Attach an admin history log entry.""" | ||||
| admin_log.attach_log_entry(self, message) | admin_log.attach_log_entry(self, message) | ||||
| def extend_subscription(self, *, from_timestamp: datetime.datetime, months: float) -> None: | def extend_subscription(self, *, from_timestamp: datetime.datetime, months: float) -> None: | ||||
| """Extend a subscription by shifting the next_payment date. | """Extend a subscription by shifting the next_payment date. | ||||
| :param from_timestamp: the timestamp to add the number of months to. | :param from_timestamp: the timestamp to add the number of months to. | ||||
| :param months: (possibly fractional) number of months to add. | :param months: (possibly fractional) number of months to add. | ||||
| ▲ Show 20 Lines • Show All 59 Lines • ▼ Show 20 Lines | def switch_payment_method(self, payment_method: PaymentMethod) -> None: | ||||
| new_method, | new_method, | ||||
| supported, | supported, | ||||
| ) | ) | ||||
| self.collection_method = new_method | self.collection_method = new_method | ||||
| self.save(update_fields={'payment_method', 'collection_method'}) | self.save(update_fields={'payment_method', 'collection_method'}) | ||||
| class OrderManager(models.Manager): | class OrderManager(models.Manager): | ||||
| def paid(self) -> models.QuerySet: | def paid(self) -> 'models.QuerySet[Order]': | ||||
| """Return only paid orders, so status in 'paid' or 'fulfilled'.""" | """Return only paid orders, so status in 'paid' or 'fulfilled'.""" | ||||
| return self.filter(status__in={'paid', 'fulfilled'}) | return self.filter(status__in={'paid', 'fulfilled'}) | ||||
| def payable(self) -> models.QuerySet: | def payable(self) -> 'models.QuerySet[Order]': | ||||
| """Return orders that can still be paid.""" | """Return orders that can still be paid.""" | ||||
| return self.filter(status__in={'created', 'soft-failed'}) | return self.filter(status__in={'created', 'soft-failed'}) | ||||
| # Ignoring type because of the inner Meta class of the two mix-ins. | # Ignoring type because of the inner Meta class of the two mix-ins. | ||||
| class Order( | class Order( | ||||
| model_mixins.RecordModifcationMixin, # type: ignore | model_mixins.RecordModifcationMixin, # type: ignore | ||||
| model_mixins.CreatedUpdatedMixin, | model_mixins.CreatedUpdatedMixin, | ||||
| ▲ Show 20 Lines • Show All 49 Lines • ▼ Show 20 Lines | ): | ||||
| paid_at = models.DateTimeField( | paid_at = models.DateTimeField( | ||||
| null=True, blank=True, help_text='When the order was paid, if at all.' | null=True, blank=True, help_text='When the order was paid, if at all.' | ||||
| ) | ) | ||||
| retry_after = models.DateTimeField( | retry_after = models.DateTimeField( | ||||
| null=True, blank=True, help_text='When automatic collection should be retried.' | null=True, blank=True, help_text='When automatic collection should be retried.' | ||||
| ) | ) | ||||
| objects = OrderManager() | objects: OrderManager = OrderManager() | ||||
| def latest_transaction(self) -> Optional['Transaction']: | def latest_transaction(self) -> Optional['Transaction']: | ||||
| """Returns the latest transaction for this Order, if there is one""" | """Returns the latest transaction for this Order, if there is one""" | ||||
| try: | try: | ||||
| return self.transaction_set.latest() | return self.transaction_set.latest() | ||||
| except Transaction.DoesNotExist: | except Transaction.DoesNotExist: | ||||
| return None | return None | ||||
| def save(self, *args, **kwargs) -> None: | def save(self, *args: Any, **kwargs: Any) -> None: | ||||
| """Act on state changes.""" | """Act on state changes.""" | ||||
| was_changed, old_state = self.pre_save_record() | was_changed, old_state = self.pre_save_record() | ||||
| old_status = old_state.get('status', '') | old_status = old_state.get('status', '') | ||||
| if ( | if ( | ||||
| was_changed | was_changed | ||||
| and old_status != self.status | and old_status != self.status | ||||
| and self.status == 'paid' | and self.status == 'paid' | ||||
| and self.paid_at is None | and self.paid_at is None | ||||
| ): | ): | ||||
| self.paid_at = django.utils.timezone.now() | self.paid_at = django.utils.timezone.now() | ||||
| update_fields: Optional[Set[str]] = kwargs.get('update_fields') | update_fields: UpdateFieldsType = kwargs.get('update_fields') | ||||
| if update_fields is not None: | if update_fields is not None: | ||||
| update_fields.add('paid_at') | update_fields.add('paid_at') | ||||
| super().save(*args, **kwargs) | super().save(*args, **kwargs) | ||||
| if was_changed: | if was_changed: | ||||
| self._handle_status_change(old_status=old_status) | self._handle_status_change(old_status=old_status) | ||||
| ▲ Show 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | def generate_transaction(self, *, save: bool = True) -> 'Transaction': | ||||
| paid=False, | paid=False, | ||||
| payment_method=self.payment_method, | payment_method=self.payment_method, | ||||
| ip_address=None, # will be set once the transaction is processed. | ip_address=None, # will be set once the transaction is processed. | ||||
| ) | ) | ||||
| if save: | if save: | ||||
| transaction.save() | transaction.save() | ||||
| return transaction | return transaction | ||||
| def __str__(self): | def __str__(self) -> str: | ||||
| return f'Order {self.pk} for Subscription {self.subscription.pk}' | return f'Order {self.pk} for Subscription {self.subscription.pk}' | ||||
| def total_refunded(self) -> Money: | def total_refunded(self) -> Money: | ||||
| """Compute the total refunded amount for this order. | """Compute the total refunded amount for this order. | ||||
| Since an order can have multiple transactions that all have a refunded | Since an order can have multiple transactions that all have a refunded | ||||
| amount, we have to sum them all. | amount, we have to sum them all. | ||||
| Note that this function assumes that all transactions have the same | Note that this function assumes that all transactions have the same | ||||
| currency as the order. | currency as the order. | ||||
| """ | """ | ||||
| result = self.transaction_set.all().aggregate(models.Sum('amount_refunded')) | result = self.transaction_set.all().aggregate(models.Sum('amount_refunded')) | ||||
| # Prevent None, which happens when an order has no transactions. | # Prevent None, which happens when an order has no transactions. | ||||
| refunded_sum = result['amount_refunded__sum'] or 0 | refunded_sum = result['amount_refunded__sum'] or 0 | ||||
| return Money(self.currency, refunded_sum) | return Money(self.currency, refunded_sum) | ||||
| def attach_log_entry(self, message: str): | def attach_log_entry(self, message: str) -> None: | ||||
| """Attach an admin history log entry to this Order and its Subscription.""" | """Attach an admin history log entry to this Order and its Subscription.""" | ||||
| admin_log.attach_log_entry(self, message) | admin_log.attach_log_entry(self, message) | ||||
| admin_url = reverse('admin:looper_order_change', kwargs={'object_id': self.id}) | admin_url = reverse('admin:looper_order_change', kwargs={'object_id': self.id}) | ||||
| subs: Subscription = self.subscription # make PyCharm happy | subs: Subscription = self.subscription # make PyCharm happy | ||||
| subs.attach_log_entry( | subs.attach_log_entry( | ||||
| format_html('{} for <a href="{}">order #{}</a>', message, admin_url, self.id) | format_html('{} for <a href="{}">order #{}</a>', message, admin_url, self.id) | ||||
| ) | ) | ||||
| ▲ Show 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | STATUSES = ( | ||||
| ('failed', 'Failed'), | ('failed', 'Failed'), | ||||
| ) | ) | ||||
| order = models.ForeignKey(Order, on_delete=models.CASCADE, null=True) | order = models.ForeignKey(Order, on_delete=models.CASCADE, null=True) | ||||
| payment_method = models.ForeignKey(PaymentMethod, on_delete=models.CASCADE) | payment_method = models.ForeignKey(PaymentMethod, on_delete=models.CASCADE) | ||||
| user = models.ForeignKey(User, on_delete=models.CASCADE) | user = models.ForeignKey(User, on_delete=models.CASCADE) | ||||
| status = models.CharField(choices=STATUSES, default=STATUS_DEFAULT, max_length=20) | status = models.CharField(choices=STATUSES, default=STATUS_DEFAULT, max_length=20) | ||||
| currency = CurrencyField() | currency: str = CurrencyField() | ||||
| amount: Money = MoneyField(default=0) | amount: Money = MoneyField(default=0) | ||||
| ip_address = models.GenericIPAddressField( | ip_address = models.GenericIPAddressField( | ||||
| null=True, help_text='IP address of the user at the moment of paying.' | null=True, help_text='IP address of the user at the moment of paying.' | ||||
| ) | ) | ||||
| # TODO(Sybren): investigate DB constraint that requires both NULL or neither NULL. | # TODO(Sybren): investigate DB constraint that requires both NULL or neither NULL. | ||||
| refunded_at = models.DateTimeField(null=True) | refunded_at = models.DateTimeField(null=True) | ||||
| amount_refunded: Money = MoneyField(blank=True, default=0, help_text='Refunded amount, if any.') | amount_refunded: Money = MoneyField(blank=True, default=0, help_text='Refunded amount, if any.') | ||||
| failure_message = models.TextField(blank=True, default='') | failure_message = models.TextField(blank=True, default='') | ||||
| paid = models.BooleanField() | paid = models.BooleanField() | ||||
| transaction_id = models.CharField( | transaction_id = models.CharField( | ||||
| max_length=128, blank=True, default='', help_text='ID of the transaction on the gateway.' | max_length=128, blank=True, default='', help_text='ID of the transaction on the gateway.' | ||||
| ) | ) | ||||
| @property | @property | ||||
| def refundable(self) -> Money: | def refundable(self) -> Money: | ||||
| return self.amount - self.amount_refunded | return self.amount - self.amount_refunded | ||||
| def __str__(self): | def __str__(self) -> str: | ||||
| return f'Transaction {self.pk} for order {self.order.pk}' | return f'Transaction {self.pk} for order {self.order.pk}' | ||||
| def save(self, *args, **kwargs) -> None: | def save(self, *args: Any, **kwargs: Any) -> None: | ||||
| """Act on state changes.""" | """Act on state changes.""" | ||||
| is_new = not self.pk | is_new = not self.pk | ||||
| super().save(*args, **kwargs) | super().save(*args, **kwargs) | ||||
| if is_new: | if is_new: | ||||
| self.log.debug('Transaction pk=%d was just created.', self.pk) | self.log.debug('Transaction pk=%d was just created.', self.pk) | ||||
| def mark_as_failed(self, failure_message: str): | def mark_as_failed(self, failure_message: str): | ||||
| ▲ Show 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | def charge(self, *, customer_ip_address: Optional[str]) -> bool: | ||||
| # TODO(Sybren): shouldn't this be done in Transaction.save(), in response | # TODO(Sybren): shouldn't this be done in Transaction.save(), in response | ||||
| # to the status changing to 'succeeded'? | # to the status changing to 'succeeded'? | ||||
| self.order.status = 'paid' | self.order.status = 'paid' | ||||
| self.order.save(update_fields={'status'}) | self.order.save(update_fields={'status'}) | ||||
| return True | return True | ||||
| @requires_status('succeeded') | @requires_status('succeeded') | ||||
| def refund(self, amount: Money): | def refund(self, amount: Money) -> None: | ||||
| """Refund the customer. | """Refund the customer. | ||||
| :param amount: Amount to refund. May not be more than the total | :param amount: Amount to refund. May not be more than the total | ||||
| transaction amount minus the amount refunded so far. | transaction amount minus the amount refunded so far. | ||||
| :raises looper.exceptions.GatewayError: when the refund could not | :raises looper.exceptions.GatewayError: when the refund could not | ||||
| be performed at the payment gateway. | be performed at the payment gateway. | ||||
| """ | """ | ||||
| assert self.pk, 'Transaction needs to be saved before refunding' | assert self.pk, 'Transaction needs to be saved before refunding' | ||||
| assert isinstance(amount, Money), f'amount must be Money, not {amount!r}' | assert isinstance(amount, Money), f'amount must be Money, not {amount!r}' | ||||
| if amount.currency != self.currency: | if amount.currency != self.currency: | ||||
| raise ValueError( | raise ValueError( | ||||
| f'Refund currency {amount.currency!r} does not match ' | f'Refund currency {amount.currency!r} does not match ' | ||||
| 'transaction currency {self.currency!r}' | 'transaction currency {self.currency!r}' | ||||
| ) | ) | ||||
| try: | try: | ||||
| self.payment_method.gateway.provider.refund(self.transaction_id, amount) | self.payment_method.gateway.provider.refund(self.transaction_id, amount) | ||||
| except looper.exceptions.GatewayError as ex: | except looper.exceptions.GatewayError: | ||||
| raise | raise | ||||
| self.log.info( | self.log.info( | ||||
| 'Transaction pk=%d was succesful, storing transaction ID %r', | 'Transaction pk=%d was succesful, storing transaction ID %r', | ||||
| self.pk, | self.pk, | ||||
| self.transaction_id, | self.transaction_id, | ||||
| ) | ) | ||||
| self.refunded_at = django.utils.timezone.now() | self.refunded_at = django.utils.timezone.now() | ||||
| self.amount_refunded = self.amount_refunded + amount | self.amount_refunded = self.amount_refunded + amount | ||||
| self.save(update_fields={'refunded_at', 'amount_refunded'}) | self.save(update_fields={'refunded_at', 'amount_refunded'}) | ||||
| self.attach_log_entry(f'Refund of {amount} was successful') | self.attach_log_entry(f'Refund of {amount} was successful') | ||||
| def attach_log_entry(self, message: str): | def attach_log_entry(self, message: str) -> None: | ||||
| """Attach an admin history log entry to this Transaction and its Order + Subscription.""" | """Attach an admin history log entry to this Transaction and its Order + Subscription.""" | ||||
| admin_log.attach_log_entry(self, message) | admin_log.attach_log_entry(self, message) | ||||
| admin_url = reverse('admin:looper_transaction_change', kwargs={'object_id': self.id}) | admin_url = reverse('admin:looper_transaction_change', kwargs={'object_id': self.id}) | ||||
| order: Order = self.order # make PyCharm happy | order: Order = self.order # make PyCharm happy | ||||
| order.attach_log_entry( | order.attach_log_entry( | ||||
| format_html('{} for <a href="{}">transaction #{}</a>', message, admin_url, self.id) | format_html('{} for <a href="{}">transaction #{}</a>', message, admin_url, self.id) | ||||
| ) | ) | ||||
| # class Coupon(models.Model): | # class Coupon(models.Model): | ||||
| # """Discounts! Implement later.""" | # """Discounts! Implement later.""" | ||||
| # pass | # pass | ||||
| # | # | ||||
| # | # | ||||