Changeset View
Changeset View
Standalone View
Standalone View
looper/admin/reports.py
- This file was added.
| from typing import Optional | |||||
| import json | |||||
| import logging | |||||
| from django.contrib import admin | |||||
| from django.contrib.admin.filters import AllValuesFieldListFilter | |||||
| from django.contrib.auth import get_user_model | |||||
| from django.core.serializers.json import DjangoJSONEncoder | |||||
| from django.db.models import Sum | |||||
| from django.db.models.functions import TruncDay | |||||
| from django.shortcuts import redirect | |||||
| from django.utils import timezone | |||||
| import django.db.models.lookups | |||||
| from .filters import ChoicesFieldNoAllListFilter | |||||
| from .mixins import NoChangeMixin, NoAddDeleteMixin, ExportCSVMixin | |||||
| from .models import EUVatPerCountryReport, OrdersPerVATNumber, Sales | |||||
| from looper.money import Money | |||||
| User = get_user_model() | |||||
| logger = logging.getLogger(__name__) | |||||
| class Report(NoChangeMixin, NoAddDeleteMixin, ExportCSVMixin, admin.ModelAdmin): | |||||
| list_display_links = None | |||||
| change_list_template = 'admin/looper/reports/report.html' | |||||
| def get_currency(self, queryset) -> Optional[str]: | |||||
| """Try to figure out the currency from the request.""" | |||||
| for lookup in queryset.query.where.children: | |||||
| if not isinstance(lookup, django.db.models.lookups.Exact): | |||||
| continue | |||||
| if not lookup.lhs.target.name == 'currency': | |||||
| continue | |||||
| return lookup.rhs | |||||
| def changelist_view(self, request, extra_context=None): | |||||
| """We can only report per currency, so make sure currency filter is on.""" | |||||
| referrer = request.META.get('HTTP_REFERER', '') | |||||
| year = timezone.now().year | |||||
| get_params = f'currency__exact=EUR&paid_at__year={year}' | |||||
| if len(request.GET) == 0 and '?' not in referrer: | |||||
| return redirect(f'{request.path}?{get_params}') | |||||
| return super().changelist_view(request, extra_context=extra_context) | |||||
| def get_report_lines(self, queryset): | |||||
| """Return a list of proxy models that will be displayed instead of the changelist.""" | |||||
| raise NotImplementedError('Report must implement its own get_report_lines method') | |||||
| def get_changelist_instance(self, request): | |||||
| """Monkey-patch changelist replacing the queryset with the report lines.""" | |||||
| try: | |||||
| cl = super().get_changelist_instance(request) | |||||
| report = self.get_report_lines(cl.queryset) | |||||
| cl.result_list = report | |||||
| cl.result_count = len(report) | |||||
| cl.can_show_all = True | |||||
| cl.multi_page = False | |||||
| cl.title = cl.opts.verbose_name_plural | |||||
| cl.date_hierarchy_with_quarters = getattr(self, 'date_hierarchy_with_quarters', False) | |||||
| # cl.all_filters = self.get_all_filters(cl.queryset) | |||||
| return cl | |||||
| except Exception as e: | |||||
| logger.exception(e) | |||||
| raise | |||||
| @admin.register(EUVatPerCountryReport) | |||||
| class EUVatPerCountryReportAdmin(Report): | |||||
| list_display = ( | |||||
| 'country', | |||||
| 'country_code', | |||||
| 'tax_rate', | |||||
| 'sales', | |||||
| 'refunds', | |||||
| 'vat_charged', | |||||
| 'vat_refunded', | |||||
| 'total_charged', | |||||
| 'vat_total', | |||||
| ) | |||||
| list_filter = ('is_legacy', ('currency', ChoicesFieldNoAllListFilter)) | |||||
| ordering = ('tax_country',) | |||||
| date_hierarchy = 'paid_at' | |||||
| date_hierarchy_with_quarters = True | |||||
| def get_queryset(self, request): | |||||
| """Apply additional filters to the report's queryset.""" | |||||
| queryset = super().get_queryset(request) | |||||
| return queryset.filter(paid_at__isnull=False, tax__gt=0) | |||||
| def get_report_lines(self, queryset): | |||||
| """Return a list of proxy models that will be displayed instead of the changelist.""" | |||||
| currency = self.get_currency(queryset) | |||||
| per_country = {} | |||||
| for order in queryset.all(): | |||||
| key = (order.tax_country, order.tax_rate, order.currency) | |||||
| if key not in per_country: | |||||
| per_country[key] = [] | |||||
| row = per_country[key] | |||||
| row.append(order) | |||||
| report_lines = [ | |||||
| EUVatPerCountryReport( | |||||
| tax_country=key[0], | |||||
| tax_rate=key[1], | |||||
| sales=sum((_.price for _ in orders), start=Money(currency, 0)), | |||||
| refunds=sum((_.refunded for _ in orders), start=Money(currency, 0)), | |||||
| vat_charged=sum((_.tax for _ in orders), start=Money(currency, 0)), | |||||
| vat_refunded=sum((_.tax_refunded for _ in orders), start=Money(currency, 0)), | |||||
| ) | |||||
| for key, orders in per_country.items() | |||||
| ] | |||||
| report_lines.append( | |||||
| EUVatPerCountryReport( | |||||
| tax_country='Total', | |||||
| tax_rate=None, | |||||
| sales=sum((_.sales for _ in report_lines), start=Money(currency, 0)), | |||||
| refunds=sum((_.refunds for _ in report_lines), start=Money(currency, 0)), | |||||
| vat_charged=sum((_.vat_charged for _ in report_lines), start=Money(currency, 0)), | |||||
| vat_refunded=sum((_.vat_refunded for _ in report_lines), start=Money(currency, 0)), | |||||
| ) | |||||
| ) | |||||
| return report_lines | |||||
| @admin.register(OrdersPerVATNumber) | |||||
| class OrdersPerVATNumberAdmin(Report): | |||||
| list_display = ('order_id', 'country', 'vat_number', 'sales') | |||||
| list_filter = ( | |||||
| 'is_legacy', | |||||
| ('currency', ChoicesFieldNoAllListFilter), | |||||
| ('tax_country', AllValuesFieldListFilter), | |||||
| ) | |||||
| ordering = ('tax_country', 'vat_number') | |||||
| date_hierarchy = 'paid_at' | |||||
| date_hierarchy_with_quarters = True | |||||
| def get_queryset(self, request): | |||||
| """Apply additional filters to the report's queryset.""" | |||||
| queryset = super().get_queryset(request) | |||||
| return ( | |||||
| queryset.filter( | |||||
| paid_at__isnull=False, | |||||
| vat_number__isnull=False, | |||||
| ) | |||||
| .exclude(vat_number='') | |||||
| .order_by('vat_number', 'pk') | |||||
| ) | |||||
| def get_report_lines(self, queryset): | |||||
| """Return a list of proxy models that will be displayed instead of the changelist.""" | |||||
| currency = self.get_currency(queryset) | |||||
| per_vat_number = {} | |||||
| for order in queryset.all(): | |||||
| key = (order.tax_country, order.vat_number) | |||||
| if key not in per_vat_number: | |||||
| per_vat_number[key] = [] | |||||
| row = per_vat_number[key] | |||||
| row.append(order) | |||||
| report_lines = [ | |||||
| OrdersPerVATNumber( | |||||
| tax_country=key[0], | |||||
| vat_number=key[1], | |||||
| sales=sum((_.price for _ in orders), start=Money(currency, 0)), | |||||
| orders=orders, | |||||
| ) | |||||
| for key, orders in per_vat_number.items() | |||||
| ] | |||||
| report_lines.append( | |||||
| OrdersPerVATNumber( | |||||
| tax_country='Total', | |||||
| vat_number='', | |||||
| sales=sum((_.sales for _ in report_lines), start=Money(currency, 0)), | |||||
| orders=[], | |||||
| ) | |||||
| ) | |||||
| return report_lines | |||||
| @admin.register(Sales) | |||||
| class SalesAdmin(Report): | |||||
| list_display = ( | |||||
| 'gross_sales', | |||||
| 'avg_daily_gross_sales', | |||||
| 'net_sales', | |||||
| 'avg_daily_net_sales', | |||||
| 'orders_placed', | |||||
| 'refunds', | |||||
| 'orders_refunded', | |||||
| ) | |||||
| list_filter = ( | |||||
| 'is_legacy', | |||||
| ('currency', ChoicesFieldNoAllListFilter), | |||||
| 'paid_at', | |||||
| ('tax_country', AllValuesFieldListFilter), | |||||
| ) | |||||
| date_hierarchy = 'paid_at' | |||||
| date_hierarchy_with_quarters = True | |||||
| change_list_template = 'admin/looper/reports/sales.html' | |||||
| def get_form(self, request, obj=None, **kwargs): | |||||
| """Add help texts to the report's fields. Currently unused.""" | |||||
| help_texts = { | |||||
| 'gross_sales': 'This is the sum of the order totals' | |||||
| ' after any refunds and including shipping and taxes.', | |||||
| 'net_sales': 'This is the sum of the order totals' | |||||
| ' after any refunds and excluding shipping and taxes.', | |||||
| } | |||||
| kwargs.update({'help_texts': help_texts}) | |||||
| return super().get_form(request, obj, **kwargs) | |||||
| def get_queryset(self, request): | |||||
| """Apply additional filters to the report's queryset.""" | |||||
| queryset = super().get_queryset(request) | |||||
| return queryset.filter(paid_at__isnull=False) | |||||
| def get_report_lines(self, queryset): | |||||
| """Return a list of proxy models that will be displayed instead of the changelist.""" | |||||
| currency = self.get_currency(queryset) | |||||
| per_date = {} | |||||
| for order in queryset.all(): | |||||
| key = order.paid_at.date() | |||||
| if key not in per_date: | |||||
| per_date[key] = [] | |||||
| row = per_date[key] | |||||
| row.append(order) | |||||
| report_lines = [ | |||||
| Sales( | |||||
| paid_at=key, | |||||
| gross_sales=sum((_.price for _ in orders), start=Money(currency, 0)), | |||||
| net_sales=sum((_.price - _.refunded for _ in orders), start=Money(currency, 0)), | |||||
| refunds=sum((_.refunded for _ in orders), start=Money(currency, 0)), | |||||
| orders_placed=len(orders), | |||||
| orders_refunded=len([_ for _ in orders if _.refunded_at is not None]), | |||||
| ) | |||||
| for key, orders in per_date.items() | |||||
| ] | |||||
| total = Sales( | |||||
| paid_at=None, | |||||
| gross_sales=sum((_.gross_sales for _ in report_lines), start=Money(currency, 0)), | |||||
| net_sales=sum((_.net_sales for _ in report_lines), start=Money(currency, 0)), | |||||
| refunds=sum((_.refunds for _ in report_lines), start=Money(currency, 0)), | |||||
| orders_placed=sum((_.orders_placed for _ in report_lines)), | |||||
| orders_refunded=sum((_.orders_refunded for _ in report_lines)), | |||||
| days=len(report_lines), | |||||
| ) | |||||
| # N.B.: for timestamps/decimation: x=Extract(TruncDay(self.date_hierarchy), 'epoch') | |||||
| # Set up chart data using total sales (gross sales: order prices tax incl.) | |||||
| chart_data = ( | |||||
| queryset.annotate(date=TruncDay(self.date_hierarchy)) | |||||
| .values('date') | |||||
| .annotate(y=Sum('price') / 100) | |||||
| .order_by('-date') | |||||
| ) | |||||
| self.chart = { | |||||
| 'data': json.dumps(list(chart_data), cls=DjangoJSONEncoder), | |||||
| 'label': 'Gross sales', | |||||
| 'aggregate_by': self.date_hierarchy, | |||||
| 'aggregate_by_prefix': Money(currency, 0).currency_symbol, | |||||
| } | |||||
| return [total] | |||||
| def get_changelist_instance(self, request): | |||||
| """Make chart data accessible via ChangeList.""" | |||||
| cl = super().get_changelist_instance(request) | |||||
| cl.chart = self.chart | |||||
| return cl | |||||