Changeset View
Changeset View
Standalone View
Standalone View
looper/middleware.py
| from typing import Callable, Optional, TypeVar | |||||
| import functools | import functools | ||||
| import logging | import logging | ||||
| import threading | import threading | ||||
| import geoip2.database | |||||
| from django.conf import settings | from django.conf import settings | ||||
| from django.http import HttpRequest | |||||
| from django.http.response import HttpResponseBase | |||||
| from geoip2.errors import AddressNotFoundError | from geoip2.errors import AddressNotFoundError | ||||
| import geoip2.database | |||||
| import geoip2.records | |||||
| from looper import utils | from looper import utils | ||||
| PREFERRED_CURRENCY_SESSION_KEY = 'PREFERRED_CURRENCY' | PREFERRED_CURRENCY_SESSION_KEY = 'PREFERRED_CURRENCY' | ||||
| SKIP_GEO_LOOKUP = {'127.0.0.1', '::1', 'unknown', ''} | SKIP_GEO_LOOKUP = {'127.0.0.1', '::1', 'unknown', ''} | ||||
| """Addresses for which country lookups should be skipped.""" | """Addresses for which country lookups should be skipped.""" | ||||
| log = logging.getLogger(__name__) | log = logging.getLogger(__name__) | ||||
| GetResponseType = TypeVar('GetResponseType', bound=Callable[[HttpRequest], HttpResponseBase]) | |||||
| class PreferredCurrencyMiddleware: | class PreferredCurrencyMiddleware: | ||||
| """Expose preferred currency in request.session[PREFERRED_CURRENCY_SESSION_KEY]. | """Expose preferred currency in request.session[PREFERRED_CURRENCY_SESSION_KEY]. | ||||
| Very simple selector; if the country is in the European Union, we assume | Very simple selector; if the country is in the European Union, we assume | ||||
| the Euro is preferred, otherwise the US Dollar is chosen. | the Euro is preferred, otherwise the US Dollar is chosen. | ||||
| Uses the session, so install after django.contrib.sessions.middleware.SessionMiddleware. | Uses the session, so install after django.contrib.sessions.middleware.SessionMiddleware. | ||||
| """ | """ | ||||
| log = log.getChild('PreferredCurrencyMiddleware') | log = log.getChild('PreferredCurrencyMiddleware') | ||||
| def __init__(self, get_response) -> None: | def __init__(self, get_response: GetResponseType) -> None: | ||||
| self.log.info('Opening GeoIP2 database %s', settings.GEOIP2_DB) | self.log.info('Opening GeoIP2 database %s', settings.GEOIP2_DB) | ||||
| self.geoip = geoip2.database.Reader(settings.GEOIP2_DB) | self.geoip = geoip2.database.Reader(settings.GEOIP2_DB) | ||||
| self.get_response = get_response | self.get_response: GetResponseType = get_response | ||||
| def __call__(self, request): | def __call__(self, request: HttpRequest) -> HttpResponseBase: | ||||
| self.update_session(request) | self.update_session(request) | ||||
| return self.get_response(request) | return self.get_response(request) | ||||
| @functools.lru_cache(maxsize=128) | @functools.lru_cache(maxsize=128) | ||||
| def lookup_country(self, remote_addr: str): | def lookup_country(self, remote_addr: str) -> Optional[geoip2.records.Country]: | ||||
| if remote_addr in SKIP_GEO_LOOKUP: | if remote_addr in SKIP_GEO_LOOKUP: | ||||
| # We already know that the below call will fail; don't bother | # We already know that the below call will fail; don't bother | ||||
| # even trying to look up the address. | # even trying to look up the address. | ||||
| return None | return None | ||||
| try: | try: | ||||
| country = self.geoip.country(remote_addr) | country = self.geoip.country(remote_addr) | ||||
| except AddressNotFoundError: | except AddressNotFoundError: | ||||
| return None | return None | ||||
| except OSError as ex: | except OSError as ex: | ||||
| self.log.warning( | self.log.warning( | ||||
| 'Unable to look up remote address %r in GeoIP database: %s', remote_addr, ex | 'Unable to look up remote address %r in GeoIP database: %s', remote_addr, ex | ||||
| ) | ) | ||||
| return None | return None | ||||
| return country.country | return country.country | ||||
| def preferred_currency(self, country) -> str: | def preferred_currency(self, country: Optional[geoip2.records.Country]) -> str: | ||||
| """Determine default currency for the given country. | """Determine default currency for the given country. | ||||
| Unfortunately geoip2.models.Country isn't defined in a MyPy-compatible | Unfortunately geoip2.models.Country isn't defined in a MyPy-compatible | ||||
| way, so we can't properly declare its type. | way, so we can't properly declare its type. | ||||
| :type country: geoip2.models.Country | :type country: geoip2.models.Country | ||||
| """ | """ | ||||
| if country is None: | if country is None: | ||||
| return 'USD' | return 'USD' | ||||
| if country.is_in_european_union: | if country.is_in_european_union: | ||||
| return 'EUR' | return 'EUR' | ||||
| return 'USD' | return 'USD' | ||||
| def update_session(self, request): | def update_session(self, request: HttpRequest) -> None: | ||||
| if not hasattr(request, 'session'): | if not hasattr(request, 'session'): | ||||
| self.log.warning('Session middleware not available, unable to set preferred currency') | self.log.warning('Session middleware not available, unable to set preferred currency') | ||||
| return | return | ||||
| if PREFERRED_CURRENCY_SESSION_KEY in request.session: | if PREFERRED_CURRENCY_SESSION_KEY in request.session: | ||||
| # Preferred currency is already set; don't overwrite it. | # Preferred currency is already set; don't overwrite it. | ||||
| return | return | ||||
| remote_addr = utils.get_client_ip(request) | remote_addr = utils.get_client_ip(request) | ||||
| country = self.lookup_country(remote_addr) | country = self.lookup_country(remote_addr) | ||||
| currency = self.preferred_currency(country) | currency = self.preferred_currency(country) | ||||
| request.session[PREFERRED_CURRENCY_SESSION_KEY] = currency | request.session[PREFERRED_CURRENCY_SESSION_KEY] = currency | ||||