Changeset View
Standalone View
looper/models.py
| import datetime | from decimal import Decimal | ||||
| import logging | |||||
| from typing import ( | from typing import ( | ||||
| Any, | Any, | ||||
| Dict, | Dict, | ||||
| Iterable, | Iterable, | ||||
| Mapping, | Mapping, | ||||
| Optional, | Optional, | ||||
| Set, | Set, | ||||
| Tuple, | Tuple, | ||||
| Type, | Type, | ||||
| Union, | Union, | ||||
| cast, | cast, | ||||
| ) | ) | ||||
| import datetime | |||||
| import logging | |||||
| from django.contrib.auth import get_user_model | from django.contrib.auth import get_user_model | ||||
| from django.db import models | from django.db import models | ||||
| from django.db.models import Model # noqa: F401 | from django.db.models import Model # noqa: F401 | ||||
| from django.urls import reverse | from django.urls import reverse | ||||
| from django.utils.html import format_html | from django.utils.html import format_html | ||||
| from django_countries.fields import CountryField | from django_countries.fields import CountryField | ||||
| import dateutil.relativedelta | import dateutil.relativedelta | ||||
| import django.utils.timezone | import django.utils.timezone | ||||
| from looper import admin_log, form_fields, gateways, model_mixins | from looper import admin_log, form_fields, gateways, model_mixins | ||||
| from looper.decorators import requires_status | from looper.decorators import requires_status | ||||
| from looper.money import Money, CurrencyMismatch | from looper.money import Money, CurrencyMismatch | ||||
| from looper.taxes import TaxType, ProductType, Taxable | |||||
| import looper.exceptions | import looper.exceptions | ||||
| import looper.signals | import looper.signals | ||||
| User = get_user_model() | User = get_user_model() | ||||
| DEFAULT_CURRENCY = 'EUR' | DEFAULT_CURRENCY = 'EUR' | ||||
| CURRENCIES = ( | CURRENCIES = ( | ||||
| ('USD', 'USD'), | ('USD', 'USD'), | ||||
| ('EUR', 'EUR'), | ('EUR', 'EUR'), | ||||
| ▲ Show 20 Lines • Show All 251 Lines • ▼ Show 20 Lines | ) -> 'PaymentMethod': | ||||
| ) | ) | ||||
| return paymeth | return paymeth | ||||
| def __str__(self) -> str: | def __str__(self) -> str: | ||||
| if not self.full_name: | if not self.full_name: | ||||
| return f'Customer {self.pk}' | return f'Customer {self.pk}' | ||||
| return self.full_name | return self.full_name | ||||
| def get_tax(self, product_type: str) -> Tuple[TaxType, Decimal]: | |||||
| """Calculate tax type and rate for this customer and the given product type.""" | |||||
| billing_address: Address = self.billing_address | |||||
| # At this point we assume that VATIN has been validated | |||||
| is_business = bool(self.vat_number) | |||||
| tax_type, tax_rate = ProductType(product_type).get_tax( | |||||
| buyer_country_code=billing_address.country, | |||||
| is_business=is_business, | |||||
| ) | |||||
| return tax_type, tax_rate | |||||
| class Gateway(models.Model): | class Gateway(models.Model): | ||||
| """Payment gateway data. | """Payment gateway data. | ||||
| Links Customers to a specific payment gateway. | Links Customers to a specific payment gateway. | ||||
| """ | """ | ||||
| GATEWAY_CHOICES = [(name, name.title()) for name in gateways.Registry.gateway_names()] | GATEWAY_CHOICES = [(name, name.title()) for name in gateways.Registry.gateway_names()] | ||||
| ▲ Show 20 Lines • Show All 254 Lines • ▼ Show 20 Lines | def as_text(self) -> str: | ||||
| return '\n'.join(to_str(elem) for elem in self.public_values if elem) | return '\n'.join(to_str(elem) for elem in self.public_values if elem) | ||||
| class Product(model_mixins.CreatedUpdatedMixin, models.Model): | class Product(model_mixins.CreatedUpdatedMixin, models.Model): | ||||
| """Can be a good or a service, like 'Development Fund'.""" | """Can be a good or a service, like 'Development Fund'.""" | ||||
| name = models.CharField(max_length=200) | name = models.CharField(max_length=200) | ||||
| type = models.CharField( | |||||
| choices=ProductType.as_choices(), | |||||
| default=ProductType.DONATION.value, | |||||
| max_length=20, | |||||
| help_text='The nature and type of goods or services supplied (for tax administration)', | |||||
| ) | |||||
| def __str__(self) -> str: | def __str__(self) -> str: | ||||
| return self.name | return self.name | ||||
| class Plan(models.Model): | class Plan(models.Model): | ||||
| """Acts as a variation of the product, like 'Gold' for 'Development Fund'.""" | """Acts as a variation of the product, like 'Gold' for 'Development Fund'.""" | ||||
| ▲ Show 20 Lines • Show All 72 Lines • ▼ Show 20 Lines | def price_per_month(self) -> Money: | ||||
| 'day': 12 / 365, # Approximate average nr of months in a day. | 'day': 12 / 365, # Approximate average nr of months in a day. | ||||
| 'week': 12 / 52, # Approximate average nr of months in a week. | 'week': 12 / 52, # Approximate average nr of months in a week. | ||||
| 'month': 1, | 'month': 1, | ||||
| 'year': 12, | 'year': 12, | ||||
| } | } | ||||
| months_per_interval = months[self.interval_unit] * self.interval_length | months_per_interval = months[self.interval_unit] * self.interval_length | ||||
| return self.price // months_per_interval | return self.price // months_per_interval | ||||
| @property | |||||
| def interval(self) -> dateutil.relativedelta.relativedelta: | |||||
| """Return the renewal interval as relativedelta. | |||||
| The relativedelta can be added to regular `datetime.datetime` objects. | |||||
| """ | |||||
| kwargs = {f"{self.interval_unit}s": self.interval_length} | |||||
| return dateutil.relativedelta.relativedelta(**kwargs) # type: ignore | |||||
| @property | |||||
| def first_renewal(self) -> datetime.datetime: | |||||
| """Return the date of first renewal, considering now as the activation date.""" | |||||
| now = django.utils.timezone.now() | |||||
| return now + self.interval | |||||
| def __str__(self) -> str: | def __str__(self) -> str: | ||||
| return f"{self.plan.name} - {self.currency} - {self.interval_length} {self.interval_unit}" | return f"{self.plan.name} - {self.currency} - {self.interval_length} {self.interval_unit}" | ||||
| def __repr__(self) -> str: | def __repr__(self) -> str: | ||||
| return ( | return ( | ||||
| f'PlanVariation(pk={self.pk}, plan.name={self.plan.name!r}, price={self.price!r}, ' | f'PlanVariation(pk={self.pk}, plan.name={self.plan.name!r}, price={self.price!r}, ' | ||||
| f'interval_unit={self.interval_unit!r}, interval_length={self.interval_length}, ' | f'interval_unit={self.interval_unit!r}, interval_length={self.interval_length}, ' | ||||
| f'collection_method={self.collection_method})' | f'collection_method={self.collection_method})' | ||||
| Show All 30 Lines | class SharedFields(models.Model): | ||||
| ) | ) | ||||
| currency: str = CurrencyField() | currency: str = CurrencyField() | ||||
| price: Money = MoneyField(default=0, help_text='Including tax.') | price: Money = MoneyField(default=0, help_text='Including tax.') | ||||
| collection_method = models.CharField( | collection_method = models.CharField( | ||||
| choices=COLLECTION_METHODS, default=DEFAULT_COLLECTION_METHOD, max_length=20 | choices=COLLECTION_METHODS, default=DEFAULT_COLLECTION_METHOD, max_length=20 | ||||
| ) | ) | ||||
| tax: Money = MoneyField(blank=True, default=0) | TAX_FIELDS = {'tax', 'tax_type', 'tax_country', 'tax_rate'} | ||||
| tax_type = models.CharField(max_length=20, blank=True, default='') | tax: Money = MoneyField( | ||||
| tax_region = CountryField(blank=True, default='') | blank=True, | ||||
| default=0, | |||||
| help_text='Tax amount charged at the moment of sale.', | |||||
| ) | |||||
| tax_type = models.CharField( | |||||
| choices=TaxType.as_choices(), max_length=20, default=TaxType.NO_CHARGE.value, blank=True | |||||
| ) | |||||
| tax_country = CountryField(blank=True, default='') | |||||
| tax_rate = models.DecimalField( | |||||
| max_digits=5, | |||||
| decimal_places=2, | |||||
| null=False, | |||||
| default=Decimal(0), | |||||
| blank=True, | |||||
| help_text='Tax rate percentage applicable at the moment of sale. ' | |||||
| 'Stored regardless of whether the tax was charged or not ' | |||||
| '(e.g. due to exemption or reverse-charge).', | |||||
| ) | |||||
| @classmethod | @classmethod | ||||
| def field_names(cls) -> Iterable[str]: | def field_names(cls) -> Set[str]: | ||||
| return (f.name for f in cls._meta.fields) | return {f.name for f in cls._meta.fields} | ||||
| @property | |||||
| def taxable(self) -> Taxable: | |||||
sybren: Add `-> Taxable` so that MyPy knows it's a typed function. | |||||
| """Return tax data for this object.""" | |||||
| return Taxable(self.price, tax_type=TaxType(self.tax_type), tax_rate=self.tax_rate) | |||||
| class SubscriptionManager(models.Manager): | class SubscriptionManager(models.Manager): | ||||
| def payable(self) -> 'models.QuerySet[Subscription]': | def payable(self) -> 'models.QuerySet[Subscription]': | ||||
| """Return subscriptions that can still be paid for.""" | """Return subscriptions that can still be paid for.""" | ||||
| return self.filter(status__in={'active', 'on-hold'}) | return self.filter(status__in={'active', 'on-hold'}) | ||||
| def active(self) -> 'models.QuerySet[Subscription]': | def active(self) -> 'models.QuerySet[Subscription]': | ||||
| ▲ Show 20 Lines • Show All 179 Lines • ▼ Show 20 Lines | def latest_order(self) -> Optional['Order']: | ||||
| return self.order_set.latest() | return self.order_set.latest() | ||||
| except Order.DoesNotExist: | except Order.DoesNotExist: | ||||
| return None | return None | ||||
| def save(self, *args: Any, update_fields: UpdateFieldsType = None, **kwargs: Any) -> None: | def save(self, *args: Any, update_fields: UpdateFieldsType = None, **kwargs: Any) -> None: | ||||
| """Act on state changes.""" | """Act on state changes.""" | ||||
| was_changed, old_state = self.pre_save_record() | was_changed, old_state = self.pre_save_record() | ||||
| self._handle_tax_change_pre_save(old_state, update_fields) | |||||
| if was_changed: | if was_changed: | ||||
| self._handle_status_change_pre_save(old_state, update_fields) | self._handle_status_change_pre_save(old_state, update_fields) | ||||
| super().save(*args, update_fields=update_fields, **kwargs) | super().save(*args, update_fields=update_fields, **kwargs) | ||||
| if old_state.get('user_id') != self.user_id: | if old_state.get('user_id') != self.user_id: | ||||
| self._handle_user_changed(old_state.get('user_id')) | self._handle_user_changed(old_state.get('user_id')) | ||||
| if was_changed: | if was_changed: | ||||
| self._handle_status_change_post_save(old_state) | self._handle_status_change_post_save(old_state) | ||||
| def _get_taxable_from_old_state(self, old_state: Mapping[str, Any]) -> Optional[Taxable]: | |||||
| old_price = old_state.get('price') | |||||
| if not old_price: | |||||
| return None | |||||
| old_tax_type = old_state.get('tax_type', TaxType.NO_CHARGE.value) | |||||
| old_tax_rate = old_state.get('tax_rate', Decimal(0)) | |||||
| return Taxable(old_price, tax_type=TaxType(old_tax_type), tax_rate=old_tax_rate) | |||||
| def _handle_status_change_pre_save( | def _handle_status_change_pre_save( | ||||
| self, old_state: Mapping[str, Any], update_fields: UpdateFieldsType | self, old_state: Mapping[str, Any], update_fields: UpdateFieldsType | ||||
| ) -> None: | ) -> None: | ||||
| """Handle status changes that should be reflected in the to-be-saved instance. | """Handle status changes that should be reflected in the to-be-saved instance. | ||||
| :param old_state: The current state in the database (which is about to be overwritten). | :param old_state: The current state in the database (which is about to be overwritten). | ||||
| :param update_fields: The `update_fields` param passed to the save() method. If you | :param update_fields: The `update_fields` param passed to the save() method. If you | ||||
| change fields in `self` that should be persisted, be sure to update this in-place. | change fields in `self` that should be persisted, be sure to update this in-place. | ||||
| """ | """ | ||||
| old_status = old_state.get('status', '') | old_status = old_state.get('status', '') | ||||
| if old_status == self.status: | if old_status == self.status: | ||||
| return | return | ||||
| was_active = old_state.get('is_active', False) | was_active = old_state.get('is_active', False) | ||||
| if self.is_active and not was_active: | if self.is_active and not was_active: | ||||
| self._on_activation_pre_save(old_status, update_fields) | self._on_activation_pre_save(old_status, update_fields) | ||||
| elif not self.is_active and was_active: | elif not self.is_active and was_active: | ||||
| self._on_deactivation_pre_save(old_status, update_fields) | self._on_deactivation_pre_save(old_status, update_fields) | ||||
| if self.status == 'cancelled': | if self.status == 'cancelled': | ||||
| self._on_cancelled(update_fields) | self._on_cancelled(update_fields) | ||||
| def _handle_tax_change_pre_save( | |||||
| self, old_state: Mapping[str, Any], update_fields: UpdateFieldsType | |||||
| ) -> None: | |||||
| """Handle tax changes that should be reflected in the to-be-saved instance. | |||||
| :param old_state: The current state in the database (which is about to be overwritten). | |||||
| :param update_fields: The `update_fields` param passed to the save() method. If you | |||||
| change fields in `self` that should be persisted, be sure to update this in-place. | |||||
| """ | |||||
| old_taxable = self._get_taxable_from_old_state(old_state) | |||||
| new_taxable = self.taxable | |||||
| if old_taxable == new_taxable: | |||||
| return | |||||
| self.tax = new_taxable.charged_tax | |||||
Done Inline ActionsConstructing a Taxable from old_state could probably be its own function as well. sybren: Constructing a `Taxable` from `old_state` could probably be its own function as well. | |||||
| if update_fields is not None: | |||||
Done Inline ActionsConstruction of a Taxable given self is something that happens more than once. It'll be nice to just have a function for this. sybren: Construction of a `Taxable` given `self` is something that happens more than once. It'll be… | |||||
| update_fields.add('tax') | |||||
Done Inline ActionsThis can be flipped to if old_taxable == new_taxable: return, so that the actual tax-handling code is not in a conditional block sybren: This can be flipped to `if old_taxable == new_taxable: return`, so that the actual tax-handling… | |||||
| def _handle_user_changed(self, old_user_id: Optional[int]) -> None: | def _handle_user_changed(self, old_user_id: Optional[int]) -> None: | ||||
| """Reset the payment method.""" | """Reset the payment method.""" | ||||
| if old_user_id is None: | if old_user_id is None: | ||||
Done Inline ActionsThis looks like it should be a function on Taxable, so that you can do self.tax = new_taxable.charged_tax() or something along those lines. old_taxable = self.taxable_from_old_state(old_state)
new_taxable = self.taxable()
if old_taxable == new_taxable:
return
self.tax = new_taxable.charged_tax_amount()
if update_fields is not None:
update_fields.add('tax')sybren: This looks like it should be a function on `Taxable`, so that you can do `self.tax =… | |||||
| return | return | ||||
| new_user_id = self.user_id | new_user_id = self.user_id | ||||
| self.log.info( | self.log.info( | ||||
| 'Subscription %d changed user from %d to %s', self.pk, old_user_id, new_user_id | 'Subscription %d changed user from %d to %s', self.pk, old_user_id, new_user_id | ||||
| ) | ) | ||||
| for order in self.order_set.all(): | for order in self.order_set.all(): | ||||
| ▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | def _on_deactivation_post_save(self, old_status: str) -> None: | ||||
| self.log.debug('Subscription pk=%d was deactivated', self.pk) | self.log.debug('Subscription pk=%d was deactivated', self.pk) | ||||
| looper.signals.subscription_deactivated.send(sender=self, old_status=old_status) | looper.signals.subscription_deactivated.send(sender=self, old_status=old_status) | ||||
| def generate_order(self, *, save: bool = True) -> 'Order': | def generate_order(self, *, save: bool = True) -> 'Order': | ||||
| """Generate an order for the current subscription. | """Generate an order for the current subscription. | ||||
| :param save: save the generated order to the database. | :param save: save the generated order to the database. | ||||
| """ | """ | ||||
| # Dynamically get the fields to copy. | # Dynamically get the fields to copy. | ||||
| field_copies = {name: getattr(self, name) for name in SharedFields.field_names()} | field_copies = {name: getattr(self, name) for name in SharedFields.field_names()} | ||||
| # Final amount presented to customer and charged is affected by the tax | |||||
Done Inline ActionsThis could use the taxable = self.taxable() function mentioned above. sybren: This could use the `taxable = self.taxable()` function mentioned above. | |||||
| # and can differ from the Subscription.price | |||||
| field_copies['price'] = self.taxable.price | |||||
| customer: Customer = self.user.customer | customer: Customer = self.user.customer | ||||
| order = Order( | order = Order( | ||||
| subscription=self, | subscription=self, | ||||
| email=customer.billing_email, | email=customer.billing_email, | ||||
| billing_address=customer.billing_address.as_text(), | billing_address=customer.billing_address.as_text(), | ||||
| vat_number=customer.vat_number, | |||||
| status='created', | status='created', | ||||
| name=f'{self.plan.product.name} / {self.plan.name}', | name=f'{self.plan.product.name} / {self.plan.name}', | ||||
| **field_copies, | **field_copies, | ||||
| ) | ) | ||||
| if save: | if save: | ||||
| order.save() | order.save() | ||||
| return order | return order | ||||
| ▲ Show 20 Lines • Show All 105 Lines • ▼ Show 20 Lines | def switch_payment_method(self, payment_method: PaymentMethod) -> None: | ||||
| self.pk, | self.pk, | ||||
| self.collection_method, | self.collection_method, | ||||
| new_method, | new_method, | ||||
| supported, | supported, | ||||
| ) | ) | ||||
| self.collection_method = new_method | self.collection_method = new_method | ||||
| self.save(update_fields={'payment_method', 'collection_method'}) | self.save(update_fields={'payment_method', 'collection_method'}) | ||||
| def update_tax(self, save=True): | |||||
| """Update tax type and rate, based on the plan and customer's billing details. | |||||
| It's up to the Django project to call this method when it's appropriate to update | |||||
Done Inline ActionsThere are so many possible uses of the word "project" that it could use some specification here. I'm guessing it's referring to the Django project? sybren: There are so many possible uses of the word "project" that it could use some specification here. | |||||
| a Subscription's tax fields. | |||||
| It could be triggered by a signal, called by a view or a cronjob | |||||
| checking if a Customer's billing details are still valid. | |||||
| """ | |||||
| customer: Customer = self.user.customer | |||||
| billing_address: Address = customer.billing_address | |||||
| self.tax_country = billing_address.country | |||||
| product_type = self.plan.product.type | |||||
| tax_type, tax_rate = customer.get_tax(product_type=product_type) | |||||
| self.tax_type = tax_type.value | |||||
| self.tax_rate = tax_rate | |||||
| if save: | |||||
| self.save(update_fields=self.TAX_FIELDS) | |||||
| class OrderManager(models.Manager): | class OrderManager(models.Manager): | ||||
| def paid(self) -> 'models.QuerySet[Order]': | def paid(self) -> 'models.QuerySet[Order]': | ||||
| """Return only paid orders, so status in 'paid' or 'fulfilled'.""" | """Return only paid orders, so status in 'paid' or 'fulfilled'.""" | ||||
| return self.filter(status__in={'paid', 'fulfilled'}) | return self.filter(status__in={'paid', 'fulfilled'}) | ||||
| def payable(self) -> 'models.QuerySet[Order]': | def payable(self) -> 'models.QuerySet[Order]': | ||||
| """Return orders that can still be paid.""" | """Return orders that can still be paid.""" | ||||
| Show All 40 Lines | ): | ||||
| subscription = models.ForeignKey(Subscription, on_delete=models.CASCADE, editable=False) | subscription = models.ForeignKey(Subscription, on_delete=models.CASCADE, editable=False) | ||||
| name = models.CharField( | name = models.CharField( | ||||
| max_length=255, help_text='What the order is for; shown to the customer' | max_length=255, help_text='What the order is for; shown to the customer' | ||||
| ) | ) | ||||
| email = models.EmailField(max_length=255) # Snapshot of the current email | email = models.EmailField(max_length=255) # Snapshot of the current email | ||||
| billing_address = models.TextField() # Snapshot of the current address | billing_address = models.TextField() # Snapshot of the current address | ||||
| vat_number = models.CharField( # Snapshot of the current VATIN, if supplied | |||||
| max_length=255, blank=True, default='', verbose_name='VAT identification number' | |||||
| ) | |||||
| status = models.CharField(choices=STATUSES, default=DEFAULT_STATUS, max_length=20) | status = models.CharField(choices=STATUSES, default=DEFAULT_STATUS, max_length=20) | ||||
| collection_attempts = models.IntegerField( | collection_attempts = models.IntegerField( | ||||
| default=0, | default=0, | ||||
| blank=True, | blank=True, | ||||
| editable=False, | editable=False, | ||||
| help_text='How often an automatic collection attempt was made.', | help_text='How often an automatic collection attempt was made.', | ||||
| ) | ) | ||||
| ▲ Show 20 Lines • Show All 177 Lines • ▼ Show 20 Lines | def switch_payment_method(self, payment_method: PaymentMethod) -> None: | ||||
| self.pk, | self.pk, | ||||
| self.collection_method, | self.collection_method, | ||||
| new_method, | new_method, | ||||
| supported, | supported, | ||||
| ) | ) | ||||
| self.collection_method = new_method | self.collection_method = new_method | ||||
| self.save(update_fields={'payment_method', 'collection_method'}) | self.save(update_fields={'payment_method', 'collection_method'}) | ||||
| @requires_status('created', 'soft-failed', 'failed') | |||||
| def update(self): | |||||
| """Re-generate order copying all subscription values and updating the price. | |||||
| This method must be called explicitly when necessary, e.g.: | |||||
| when billing details are updated, affecting a yet unpaid order. | |||||
| """ | |||||
| order_tmp = self.subscription.generate_order(save=False) | |||||
| # Setting Money values will fail in case currency is changing unless it's updated first | |||||
| self.currency = order_tmp.currency | |||||
| for field in self._meta.fields: | |||||
| setattr(self, field.name, getattr(order_tmp, field.name)) | |||||
| self.save() | |||||
Done Inline Actionsdel is unnecessary here, as order_tmp is a local name and will go out of scope one line later. sybren: `del` is unnecessary here, as `order_tmp` is a local name and will go out of scope one line… | |||||
| class Transaction(model_mixins.CreatedUpdatedMixin, models.Model): | class Transaction(model_mixins.CreatedUpdatedMixin, models.Model): | ||||
| """Transaction that can be charged or refunded.""" | """Transaction that can be charged or refunded.""" | ||||
| log = log.getChild('Transaction') | log = log.getChild('Transaction') | ||||
| class Meta: | class Meta: | ||||
| # We just want to order by creation time, which is reflected in the | # We just want to order by creation time, which is reflected in the | ||||
| ▲ Show 20 Lines • Show All 184 Lines • Show Last 20 Lines | |||||
Add -> Taxable so that MyPy knows it's a typed function.