black on views.py

This commit is contained in:
Yohan Boniface 2023-02-27 11:00:33 +01:00
parent 97e2df0a8d
commit 3aa34d124e

View file

@ -50,11 +50,13 @@ except ImportError:
User = get_user_model()
PRIVATE_IP = re.compile(r'((^127\.)|(^10\.)'
r'|(^172\.1[6-9]\.)'
r'|(^172\.2[0-9]\.)'
r'|(^172\.3[0-1]\.)'
r'|(^192\.168\.))')
PRIVATE_IP = re.compile(
r"((^127\.)|(^10\.)"
r"|(^172\.1[6-9]\.)"
r"|(^172\.2[0-9]\.)"
r"|(^172\.3[0-1]\.)"
r"|(^192\.168\.))"
)
ANONYMOUS_COOKIE_MAX_AGE = 60 * 60 * 24 * 30 # One month
@ -63,7 +65,7 @@ class PaginatorMixin(object):
def paginate(self, qs, per_page=None):
paginator = Paginator(qs, per_page or self.per_page)
page = self.request.GET.get('p')
page = self.request.GET.get("p")
try:
qs = paginator.page(page)
except PageNotAnInteger:
@ -82,9 +84,11 @@ class Home(TemplateView, PaginatorMixin):
def get_context_data(self, **kwargs):
qs = Map.public
if (settings.UMAP_EXCLUDE_DEFAULT_MAPS and
'spatialite' not in settings.DATABASES['default']['ENGINE']):
# Unsupported query type for sqlite.
if (
settings.UMAP_EXCLUDE_DEFAULT_MAPS
and "spatialite" not in settings.DATABASES["default"]["ENGINE"]
):
# Unsupported query type for sqlite.
qs = qs.filter(center__distance_gt=(DEFAULT_CENTER, D(km=1)))
demo_map = None
if hasattr(settings, "UMAP_DEMO_PK"):
@ -102,14 +106,14 @@ class Home(TemplateView, PaginatorMixin):
pass
else:
qs = qs.exclude(id=showcase_map.pk)
maps = qs.order_by('-modified_at')[:50]
maps = qs.order_by("-modified_at")[:50]
maps = self.paginate(maps, settings.UMAP_MAPS_PER_PAGE)
return {
"maps": maps,
"demo_map": demo_map,
"showcase_map": showcase_map,
"DEMO_SITE": settings.UMAP_DEMO_SITE
"DEMO_SITE": settings.UMAP_DEMO_SITE,
}
def get_template_names(self):
@ -121,6 +125,7 @@ class Home(TemplateView, PaginatorMixin):
else:
return [self.template_name]
home = Home.as_view()
@ -128,13 +133,14 @@ class About(Home):
template_name = "umap/about.html"
about = About.as_view()
class UserMaps(DetailView, PaginatorMixin):
model = User
slug_url_kwarg = 'username'
slug_field = 'username'
slug_url_kwarg = "username"
slug_field = "username"
list_template_name = "umap/map_list.html"
context_object_name = "current_user"
@ -148,11 +154,9 @@ class UserMaps(DetailView, PaginatorMixin):
else:
per_page = settings.UMAP_MAPS_PER_PAGE
limit = 50
maps = maps.distinct().order_by('-modified_at')[:limit]
maps = maps.distinct().order_by("-modified_at")[:limit]
maps = self.paginate(maps, per_page)
kwargs.update({
"maps": maps
})
kwargs.update({"maps": maps})
return super(UserMaps, self).get_context_data(**kwargs)
def get_template_names(self):
@ -164,6 +168,7 @@ class UserMaps(DetailView, PaginatorMixin):
else:
return super(UserMaps, self).get_template_names()
user_maps = UserMaps.as_view()
@ -172,20 +177,17 @@ class Search(TemplateView, PaginatorMixin):
list_template_name = "umap/map_list.html"
def get_context_data(self, **kwargs):
q = self.request.GET.get('q')
q = self.request.GET.get("q")
results = []
if q:
where = "to_tsvector(name) @@ plainto_tsquery(%s)"
if getattr(settings, 'UMAP_USE_UNACCENT', False):
if getattr(settings, "UMAP_USE_UNACCENT", False):
where = "to_tsvector(unaccent(name)) @@ plainto_tsquery(unaccent(%s))" # noqa
results = Map.objects.filter(share_status=Map.PUBLIC)
results = results.extra(where=[where], params=[q])
results = results.order_by('-modified_at')
results = results.order_by("-modified_at")
results = self.paginate(results)
kwargs.update({
'maps': results,
'q': q
})
kwargs.update({"maps": results, "q": q})
return kwargs
def get_template_names(self):
@ -197,57 +199,52 @@ class Search(TemplateView, PaginatorMixin):
else:
return super(Search, self).get_template_names()
search = Search.as_view()
class MapsShowCase(View):
def get(self, *args, **kwargs):
maps = Map.public.filter(center__distance_gt=(DEFAULT_CENTER, D(km=1)))
maps = maps.order_by('-modified_at')[:2500]
maps = maps.order_by("-modified_at")[:2500]
def make(m):
description = m.description or ""
if m.owner:
description = u"{description}\n{by} [[{url}|{name}]]".format(
description = "{description}\n{by} [[{url}|{name}]]".format(
description=description,
by=_("by"),
url=reverse('user_maps',
kwargs={"username": m.owner.username}),
url=reverse("user_maps", kwargs={"username": m.owner.username}),
name=m.owner,
)
description = u"{}\n[[{}|{}]]".format(
description, m.get_absolute_url(), _("View the map"))
geometry = m.settings.get('geometry', json.loads(m.center.geojson))
description = "{}\n[[{}|{}]]".format(
description, m.get_absolute_url(), _("View the map")
)
geometry = m.settings.get("geometry", json.loads(m.center.geojson))
return {
"type": "Feature",
"geometry": geometry,
"properties": {
"name": m.name,
"description": description
}
"properties": {"name": m.name, "description": description},
}
geojson = {
"type": "FeatureCollection",
"features": [make(m) for m in maps]
}
geojson = {"type": "FeatureCollection", "features": [make(m) for m in maps]}
return HttpResponse(smart_bytes(json.dumps(geojson)))
showcase = MapsShowCase.as_view()
def validate_url(request):
assert request.method == "GET"
assert is_ajax(request)
url = request.GET.get('url')
url = request.GET.get("url")
assert url
try:
URLValidator(url)
except ValidationError:
raise AssertionError()
assert 'HTTP_REFERER' in request.META
referer = urlparse(request.META.get('HTTP_REFERER'))
assert "HTTP_REFERER" in request.META
referer = urlparse(request.META.get("HTTP_REFERER"))
toproxy = urlparse(url)
local = urlparse(settings.SITE_URL)
assert toproxy.hostname
@ -264,38 +261,39 @@ def validate_url(request):
class AjaxProxy(View):
def get(self, *args, **kwargs):
# You should not use this in production (use Nginx or so)
try:
url = validate_url(self.request)
except AssertionError:
return HttpResponseBadRequest()
headers = {
'User-Agent': 'uMapProxy +http://wiki.openstreetmap.org/wiki/UMap'
}
headers = {"User-Agent": "uMapProxy +http://wiki.openstreetmap.org/wiki/UMap"}
request = Request(url, headers=headers)
opener = build_opener()
try:
proxied_request = opener.open(request)
except HTTPError as e:
return HttpResponse(e.msg, status=e.code,
content_type='text/plain')
return HttpResponse(e.msg, status=e.code, content_type="text/plain")
else:
status_code = proxied_request.code
mimetype = proxied_request.headers.get('Content-Type') or mimetypes.guess_type(url) # noqa
mimetype = proxied_request.headers.get(
"Content-Type"
) or mimetypes.guess_type(
url
) # noqa
content = proxied_request.read()
# Quick hack to prevent Django from adding a Vary: Cookie header
self.request.session.accessed = False
response = HttpResponse(content, status=status_code,
content_type=mimetype)
response = HttpResponse(content, status=status_code, content_type=mimetype)
try:
ttl = int(self.request.GET.get('ttl'))
ttl = int(self.request.GET.get("ttl"))
except (TypeError, ValueError):
pass
else:
response['X-Accel-Expires'] = ttl
response["X-Accel-Expires"] = ttl
return response
ajax_proxy = AjaxProxy.as_view()
@ -303,6 +301,7 @@ ajax_proxy = AjaxProxy.as_view()
# Utils #
# ############## #
def _urls_for_js(urls=None):
"""
Return templated URLs prepared for javascript.
@ -310,10 +309,12 @@ def _urls_for_js(urls=None):
if urls is None:
# prevent circular import
from .urls import urlpatterns, i18n_urls
urls = [url.name for url in urlpatterns + i18n_urls
if getattr(url, 'name', None)]
urls = [
url.name for url in urlpatterns + i18n_urls if getattr(url, "name", None)
]
urls = dict(zip(urls, [get_uri_template(url) for url in urls]))
urls.update(getattr(settings, 'UMAP_EXTRA_URLS', {}))
urls.update(getattr(settings, "UMAP_EXTRA_URLS", {}))
return urls
@ -321,14 +322,8 @@ def render_to_json(templates, context, request):
"""
Generate a JSON HttpResponse with rendered template HTML.
"""
html = render_to_string(
templates,
context=context,
request=request
)
_json = json.dumps({
"html": html
})
html = render_to_string(templates, context=context, request=request)
_json = json.dumps({"html": html})
return HttpResponse(_json)
@ -342,15 +337,16 @@ def simple_json_response(**kwargs):
class FormLessEditMixin:
http_method_names = [u'post', ]
http_method_names = [
"post",
]
def form_invalid(self, form):
return simple_json_response(errors=form.errors,
error=str(form.errors))
return simple_json_response(errors=form.errors, error=str(form.errors))
def get_form(self, form_class=None):
kwargs = self.get_form_kwargs()
kwargs['error_class'] = FlatErrorList
kwargs["error_class"] = FlatErrorList
return self.get_form_class()(**kwargs)
@ -361,20 +357,22 @@ class MapDetailMixin:
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
properties = {
'urls': _urls_for_js(),
'tilelayers': TileLayer.get_list(),
'allowEdit': self.is_edit_allowed(),
'default_iconUrl': "%sumap/img/marker.png" % settings.STATIC_URL, # noqa
'umap_id': self.get_umap_id(),
'licences': dict((l.name, l.json) for l in Licence.objects.all()),
'edit_statuses': [(i, str(label)) for i, label in Map.EDIT_STATUS],
'share_statuses': [(i, str(label))
for i, label in Map.SHARE_STATUS if i != Map.BLOCKED],
'anonymous_edit_statuses': [(i, str(label)) for i, label
in AnonymousMapPermissionsForm.STATUS],
"urls": _urls_for_js(),
"tilelayers": TileLayer.get_list(),
"allowEdit": self.is_edit_allowed(),
"default_iconUrl": "%sumap/img/marker.png" % settings.STATIC_URL, # noqa
"umap_id": self.get_umap_id(),
"licences": dict((l.name, l.json) for l in Licence.objects.all()),
"edit_statuses": [(i, str(label)) for i, label in Map.EDIT_STATUS],
"share_statuses": [
(i, str(label)) for i, label in Map.SHARE_STATUS if i != Map.BLOCKED
],
"anonymous_edit_statuses": [
(i, str(label)) for i, label in AnonymousMapPermissionsForm.STATUS
],
}
if self.get_short_url():
properties['shortUrl'] = self.get_short_url()
properties["shortUrl"] = self.get_short_url()
if settings.USE_I18N:
locale = settings.LANGUAGE_CODE
@ -382,23 +380,21 @@ class MapDetailMixin:
if hasattr(self.request, "LANGUAGE_CODE"):
locale = self.request.LANGUAGE_CODE
locale = to_locale(locale)
properties['locale'] = locale
context['locale'] = locale
properties["locale"] = locale
context["locale"] = locale
user = self.request.user
if not user.is_anonymous:
properties['user'] = {
'id': user.pk,
'name': user.get_username(),
'url': reverse(settings.USER_MAPS_URL,
args=(user.get_username(), ))
properties["user"] = {
"id": user.pk,
"name": user.get_username(),
"url": reverse(settings.USER_MAPS_URL, args=(user.get_username(),)),
}
map_settings = self.get_geojson()
if "properties" not in map_settings:
map_settings['properties'] = {}
map_settings['properties'].update(properties)
map_settings['properties']['datalayers'] = self.get_datalayers()
context['map_settings'] = json.dumps(map_settings,
indent=settings.DEBUG)
map_settings["properties"] = {}
map_settings["properties"].update(properties)
map_settings["properties"]["datalayers"] = self.get_datalayers()
context["map_settings"] = json.dumps(map_settings, indent=settings.DEBUG)
return context
def get_datalayers(self):
@ -414,12 +410,12 @@ class MapDetailMixin:
return {
"geometry": {
"coordinates": [DEFAULT_LONGITUDE, DEFAULT_LATITUDE],
"type": "Point"
"type": "Point",
},
"properties": {
"zoom": getattr(settings, 'LEAFLET_ZOOM', 6),
"zoom": getattr(settings, "LEAFLET_ZOOM", 6),
"datalayers": [],
}
},
}
def get_short_url(self):
@ -427,25 +423,27 @@ class MapDetailMixin:
class PermissionsMixin:
def get_permissions(self):
permissions = {}
permissions['edit_status'] = self.object.edit_status
permissions['share_status'] = self.object.share_status
permissions["edit_status"] = self.object.edit_status
permissions["share_status"] = self.object.share_status
if self.object.owner:
permissions['owner'] = {
'id': self.object.owner.pk,
'name': self.object.owner.get_username(),
'url': reverse(settings.USER_MAPS_URL,
args=(self.object.owner.get_username(), ))
permissions["owner"] = {
"id": self.object.owner.pk,
"name": self.object.owner.get_username(),
"url": reverse(
settings.USER_MAPS_URL, args=(self.object.owner.get_username(),)
),
}
permissions['editors'] = [{
'id': editor.pk,
'name': editor.get_username(),
} for editor in self.object.editors.all()]
if (not self.object.owner
and self.object.is_anonymous_owner(self.request)):
permissions['anonymous_edit_url'] = self.get_anonymous_edit_url()
permissions["editors"] = [
{
"id": editor.pk,
"name": editor.get_username(),
}
for editor in self.object.editors.all()
]
if not self.object.owner and self.object.is_anonymous_owner(self.request):
permissions["anonymous_edit_url"] = self.get_anonymous_edit_url()
return permissions
def get_anonymous_edit_url(self):
@ -454,13 +452,12 @@ class PermissionsMixin:
class MapView(MapDetailMixin, PermissionsMixin, DetailView):
def get(self, request, *args, **kwargs):
self.object = self.get_object()
canonical = self.get_canonical_url()
if not request.path == canonical:
if request.META.get('QUERY_STRING'):
canonical = "?".join([canonical, request.META['QUERY_STRING']])
if request.META.get("QUERY_STRING"):
canonical = "?".join([canonical, request.META["QUERY_STRING"]])
return HttpResponsePermanentRedirect(canonical)
if not self.object.can_view(request):
return HttpResponseForbidden()
@ -481,29 +478,26 @@ class MapView(MapDetailMixin, PermissionsMixin, DetailView):
def get_short_url(self):
shortUrl = None
if hasattr(settings, 'SHORT_SITE_URL'):
short_path = reverse_lazy('map_short_url',
kwargs={'pk': self.object.pk})
if hasattr(settings, "SHORT_SITE_URL"):
short_path = reverse_lazy("map_short_url", kwargs={"pk": self.object.pk})
shortUrl = "%s%s" % (settings.SHORT_SITE_URL, short_path)
return shortUrl
def get_geojson(self):
map_settings = self.object.settings
if "properties" not in map_settings:
map_settings['properties'] = {}
map_settings['properties']['name'] = self.object.name
map_settings['properties']['permissions'] = self.get_permissions()
map_settings["properties"] = {}
map_settings["properties"]["name"] = self.object.name
map_settings["properties"]["permissions"] = self.get_permissions()
return map_settings
class MapViewGeoJSON(MapView):
def get_canonical_url(self):
return reverse('map_geojson', args=(self.object.pk, ))
return reverse("map_geojson", args=(self.object.pk,))
def render_to_response(self, context, *args, **kwargs):
return HttpResponse(context['map_settings'],
content_type='application/json')
return HttpResponse(context["map_settings"], content_type="application/json")
class MapNew(MapDetailMixin, TemplateView):
@ -529,19 +523,17 @@ class MapCreate(FormLessEditMixin, PermissionsMixin, CreateView):
msg = _("Congratulations, your map has been created!")
permissions = self.get_permissions()
# User does not have the cookie yet.
permissions['anonymous_edit_url'] = anonymous_url
permissions["anonymous_edit_url"] = anonymous_url
response = simple_json_response(
id=self.object.pk,
url=self.object.get_absolute_url(),
permissions=permissions,
info=msg
info=msg,
)
if not self.request.user.is_authenticated:
key, value = self.object.signed_cookie_elements
response.set_signed_cookie(
key=key,
value=value,
max_age=ANONYMOUS_COOKIE_MAX_AGE
key=key, value=value, max_age=ANONYMOUS_COOKIE_MAX_AGE
)
return response
@ -549,7 +541,7 @@ class MapCreate(FormLessEditMixin, PermissionsMixin, CreateView):
class MapUpdate(FormLessEditMixin, PermissionsMixin, UpdateView):
model = Map
form_class = MapSettingsForm
pk_url_kwarg = 'map_id'
pk_url_kwarg = "map_id"
def form_valid(self, form):
self.object.settings = form.cleaned_data["settings"]
@ -564,7 +556,7 @@ class MapUpdate(FormLessEditMixin, PermissionsMixin, UpdateView):
class UpdateMapPermissions(FormLessEditMixin, UpdateView):
model = Map
pk_url_kwarg = 'map_id'
pk_url_kwarg = "map_id"
def get_form_class(self):
if self.object.owner:
@ -576,25 +568,25 @@ class UpdateMapPermissions(FormLessEditMixin, UpdateView):
form = super().get_form(form_class)
user = self.request.user
if self.object.owner and not user == self.object.owner:
del form.fields['edit_status']
del form.fields['share_status']
del form.fields['owner']
del form.fields["edit_status"]
del form.fields["share_status"]
del form.fields["owner"]
return form
def form_valid(self, form):
self.object = form.save()
return simple_json_response(
info=_("Map editors updated with success!"))
return simple_json_response(info=_("Map editors updated with success!"))
class AttachAnonymousMap(View):
def post(self, *args, **kwargs):
self.object = kwargs['map_inst']
if (self.object.owner
or not self.object.is_anonymous_owner(self.request)
or not self.object.can_edit(self.request.user, self.request)
or not self.request.user.is_authenticated):
self.object = kwargs["map_inst"]
if (
self.object.owner
or not self.object.is_anonymous_owner(self.request)
or not self.object.can_edit(self.request.user, self.request)
or not self.request.user.is_authenticated
):
return HttpResponseForbidden()
self.object.owner = self.request.user
self.object.save()
@ -608,30 +600,27 @@ class MapDelete(DeleteView):
def form_valid(self, form):
self.object = self.get_object()
if self.object.owner and self.request.user != self.object.owner:
return HttpResponseForbidden(
_('Only its owner can delete the map.'))
if not self.object.owner\
and not self.object.is_anonymous_owner(self.request):
return HttpResponseForbidden(_("Only its owner can delete the map."))
if not self.object.owner and not self.object.is_anonymous_owner(self.request):
return HttpResponseForbidden()
self.object.delete()
return simple_json_response(redirect="/")
class MapClone(PermissionsMixin, View):
def post(self, *args, **kwargs):
if not getattr(settings, "UMAP_ALLOW_ANONYMOUS", False) \
and not self.request.user.is_authenticated:
if (
not getattr(settings, "UMAP_ALLOW_ANONYMOUS", False)
and not self.request.user.is_authenticated
):
return HttpResponseForbidden()
owner = self.request.user if self.request.user.is_authenticated else None
self.object = kwargs['map_inst'].clone(owner=owner)
self.object = kwargs["map_inst"].clone(owner=owner)
response = simple_json_response(redirect=self.object.get_absolute_url())
if not self.request.user.is_authenticated:
key, value = self.object.signed_cookie_elements
response.set_signed_cookie(
key=key,
value=value,
max_age=ANONYMOUS_COOKIE_MAX_AGE
key=key, value=value, max_age=ANONYMOUS_COOKIE_MAX_AGE
)
msg = _(
"Your map has been cloned! If you want to edit this map from "
@ -649,10 +638,10 @@ class MapShortUrl(RedirectView):
permanent = True
def get_redirect_url(self, **kwargs):
map_inst = get_object_or_404(Map, pk=kwargs['pk'])
map_inst = get_object_or_404(Map, pk=kwargs["pk"])
url = map_inst.get_absolute_url()
if self.query_string:
args = self.request.META.get('QUERY_STRING', '')
args = self.request.META.get("QUERY_STRING", "")
if args:
url = "%s?%s" % (url, args)
return url
@ -665,7 +654,7 @@ class MapAnonymousEditUrl(RedirectView):
def get(self, request, *args, **kwargs):
signer = Signer()
try:
pk = signer.unsign(self.kwargs['signature'])
pk = signer.unsign(self.kwargs["signature"])
except BadSignature:
return HttpResponseForbidden()
else:
@ -675,9 +664,7 @@ class MapAnonymousEditUrl(RedirectView):
if not map_inst.owner:
key, value = map_inst.signed_cookie_elements
response.set_signed_cookie(
key=key,
value=value,
max_age=ANONYMOUS_COOKIE_MAX_AGE
key=key, value=value, max_age=ANONYMOUS_COOKIE_MAX_AGE
)
return response
@ -734,15 +721,16 @@ class DataLayerView(GZipMixin, BaseDetailView):
if getattr(settings, 'UMAP_XSENDFILE_HEADER', None):
response = HttpResponse()
path = path.replace(settings.MEDIA_ROOT, '/internal')
path = path.replace(settings.MEDIA_ROOT, "/internal")
response[settings.UMAP_XSENDFILE_HEADER] = path
else:
# TODO IMS
statobj = os.stat(path)
with open(path, 'rb') as f:
with open(path, "rb") as f:
# Should not be used in production!
response = HttpResponse(
f.read(), # should not be used in production!
content_type='application/json'
f.read(),
content_type="application/json"
)
response["Last-Modified"] = http_date(statobj.st_mtime)
response['ETag'] = self.etag()
@ -754,11 +742,11 @@ class DataLayerView(GZipMixin, BaseDetailView):
class DataLayerVersion(DataLayerView):
def _path(self):
return '{root}/{path}'.format(
return "{root}/{path}".format(
root=settings.MEDIA_ROOT,
path=self.object.get_version_path(self.kwargs['name']))
path=self.object.get_version_path(self.kwargs["name"]),
)
class DataLayerCreate(FormLessEditMixin, GZipMixin, CreateView):
@ -798,7 +786,7 @@ class DataLayerUpdate(FormLessEditMixin, GZipMixin, UpdateView):
def post(self, request, *args, **kwargs):
self.object = self.get_object()
if self.object.map != self.kwargs['map_inst']:
if self.object.map != self.kwargs["map_inst"]:
return HttpResponseForbidden()
if not self.if_match():
return HttpResponse(status=412)
@ -810,7 +798,7 @@ class DataLayerDelete(DeleteView):
def form_valid(self, form):
self.object = self.get_object()
if self.object.map != self.kwargs['map_inst']:
if self.object.map != self.kwargs["map_inst"]:
return HttpResponseForbidden()
self.object.delete()
return simple_json_response(info=_("Layer successfully deleted."))
@ -827,6 +815,7 @@ class DataLayerVersions(BaseDetailView):
# Picto #
# ############## #
class PictogramJSONList(ListView):
model = Pictogram
@ -839,6 +828,7 @@ class PictogramJSONList(ListView):
# Generic #
# ############## #
def logout(request):
do_logout(request)
return simple_json_response(redirect="/")
@ -849,4 +839,5 @@ class LoginPopupEnd(TemplateView):
End of a loggin process in popup.
Basically close the popup.
"""
template_name = "umap/login_popup_end.html"