umap/umap/views.py
Alexis Métaireau f37ed700f6 [feat] add a simple conflict resolution mecanism.
The server tries to merge conflicting saves of
the same layer.

What it does:

- use the `If-Unmodified-Since` header to check
  if changes happened to the stored data ;
- Compare the incoming version with its reference version
  to get a diff.
- Reapply the diff on top of the latest version.
- If the merge is not possible, return a
  "422 Conflict" HTTP response.
- If the merge worked, return the merged document,
  to be updated by the client.
2023-11-29 13:04:24 +01:00

1112 lines
36 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import mimetypes
import os
import re
import socket
from datetime import datetime, timedelta
from http.client import InvalidURL
from io import BytesIO
from pathlib import Path
from urllib.error import HTTPError, URLError
from urllib.parse import quote, urlparse
from urllib.request import Request, build_opener
from django.conf import settings
from django.contrib import messages
from django.contrib.auth import get_user_model
from django.contrib.auth import logout as do_logout
from django.contrib.gis.measure import D
from django.contrib.postgres.search import SearchQuery, SearchVector
from django.contrib.staticfiles.storage import staticfiles_storage
from django.core.mail import send_mail
from django.core.paginator import EmptyPage, PageNotAnInteger, Paginator
from django.core.signing import BadSignature, Signer
from django.core.validators import URLValidator, ValidationError
from django.db.models import Q
from django.http import (
HttpResponse,
HttpResponseBadRequest,
HttpResponseForbidden,
HttpResponsePermanentRedirect,
HttpResponseRedirect,
)
from django.middleware.gzip import re_accepts_gzip
from django.shortcuts import get_object_or_404
from django.urls import reverse, reverse_lazy
from django.utils.encoding import smart_bytes
from django.utils.http import http_date
from django.utils.timezone import make_aware
from django.utils.translation import gettext as _
from django.utils.translation import to_locale
from django.views.decorators.cache import cache_control
from django.views.decorators.http import require_GET
from django.views.generic import DetailView, TemplateView, View
from django.views.generic.base import RedirectView
from django.views.generic.detail import BaseDetailView
from django.views.generic.edit import CreateView, DeleteView, FormView, UpdateView
from django.views.generic.list import ListView
from . import VERSION
from .forms import (
DEFAULT_CENTER,
DEFAULT_LATITUDE,
DEFAULT_LONGITUDE,
AnonymousDataLayerPermissionsForm,
AnonymousMapPermissionsForm,
DataLayerForm,
DataLayerPermissionsForm,
FlatErrorList,
MapSettingsForm,
SendLinkForm,
UpdateMapPermissionsForm,
UserProfileForm,
)
from .models import DataLayer, Licence, Map, Pictogram, Star, TileLayer
from .utils import ConflictError, get_uri_template, gzip_file, is_ajax, merge_features
User = get_user_model()
PRIVATE_IP = re.compile(
r"((^127\.)|(^10\.)"
r"|(^172\.1[6-9]\.)"
r"|(^172\.2[0-9]\.)"
r"|(^172\.3[0-1]\.)"
r"|(^192\.168\.))"
)
ANONYMOUS_COOKIE_MAX_AGE = 60 * 60 * 24 * 30 # One month
class PaginatorMixin:
per_page = 5
def paginate(self, qs, per_page=None):
paginator = Paginator(qs, per_page or self.per_page)
page = self.request.GET.get("p")
try:
qs = paginator.page(page)
except PageNotAnInteger:
# If page is not an integer, deliver first page.
qs = paginator.page(1)
except EmptyPage:
# If page is out of range (e.g. 9999), deliver last page of
# results.
qs = paginator.page(paginator.num_pages)
return qs
def get_context_data(self, **kwargs):
kwargs.update({"is_ajax": is_ajax(self.request)})
return super().get_context_data(**kwargs)
def get_template_names(self):
"""
Dispatch template according to the kind of request: ajax or normal.
"""
if is_ajax(self.request):
return [self.list_template_name]
return super().get_template_names()
class PublicMapsMixin(object):
def get_public_maps(self):
qs = Map.public
if (
settings.UMAP_EXCLUDE_DEFAULT_MAPS
and "spatialite" not in settings.DATABASES["default"]["ENGINE"]
):
# Unsupported query type for sqlite.
qs = qs.filter(center__distance_gt=(DEFAULT_CENTER, D(km=1)))
maps = qs.order_by("-modified_at")
return maps
class Home(PaginatorMixin, TemplateView, PublicMapsMixin):
template_name = "umap/home.html"
list_template_name = "umap/map_list.html"
def get_context_data(self, **kwargs):
maps = self.get_public_maps()
demo_map = None
if hasattr(settings, "UMAP_DEMO_PK"):
try:
demo_map = Map.public.get(pk=settings.UMAP_DEMO_PK)
except Map.DoesNotExist:
pass
showcase_map = None
if hasattr(settings, "UMAP_SHOWCASE_PK"):
try:
showcase_map = Map.public.get(pk=settings.UMAP_SHOWCASE_PK)
except Map.DoesNotExist:
pass
maps = self.paginate(maps, settings.UMAP_MAPS_PER_PAGE)
return {
"maps": maps,
"demo_map": demo_map,
"showcase_map": showcase_map,
}
home = Home.as_view()
class About(Home):
template_name = "umap/about.html"
about = About.as_view()
class UserProfile(UpdateView):
model = User
form_class = UserProfileForm
success_url = reverse_lazy("user_profile")
def get_object(self):
return self.get_queryset().get(pk=self.request.user.pk)
def get_context_data(self, **kwargs):
kwargs.update(
{"providers": self.object.social_auth.values_list("provider", flat=True)}
)
return super().get_context_data(**kwargs)
user_profile = UserProfile.as_view()
class UserMaps(PaginatorMixin, DetailView):
model = User
slug_url_kwarg = "identifier"
slug_field = settings.USER_URL_FIELD
list_template_name = "umap/map_list.html"
context_object_name = "current_user"
def is_owner(self):
return self.request.user == self.object
@property
def per_page(self):
if self.is_owner():
return settings.UMAP_MAPS_PER_PAGE_OWNER
return settings.UMAP_MAPS_PER_PAGE
def get_maps(self):
qs = Map.public
qs = qs.filter(owner=self.object).union(qs.filter(editors=self.object))
return qs.order_by("-modified_at")
def get_context_data(self, **kwargs):
kwargs.update({"maps": self.paginate(self.get_maps(), self.per_page)})
return super().get_context_data(**kwargs)
user_maps = UserMaps.as_view()
class UserStars(UserMaps):
template_name = "auth/user_stars.html"
def get_maps(self):
stars = Star.objects.filter(by=self.object).values("map")
qs = Map.public.filter(pk__in=stars)
return qs.order_by("-modified_at")
user_stars = UserStars.as_view()
class SearchMixin:
def get_search_queryset(self, **kwargs):
q = self.request.GET.get("q")
if q:
vector = SearchVector("name", config=settings.UMAP_SEARCH_CONFIGURATION)
query = SearchQuery(
q, config=settings.UMAP_SEARCH_CONFIGURATION, search_type="websearch"
)
return Map.objects.annotate(search=vector).filter(search=query)
class Search(PaginatorMixin, TemplateView, PublicMapsMixin, SearchMixin):
template_name = "umap/search.html"
list_template_name = "umap/map_list.html"
def get_context_data(self, **kwargs):
qs = self.get_search_queryset()
qs_count = 0
results = []
if qs is not None:
qs = qs.filter(share_status=Map.PUBLIC).order_by("-modified_at")
qs_count = qs.count()
results = self.paginate(qs)
else:
results = self.get_public_maps()[: settings.UMAP_MAPS_PER_SEARCH]
kwargs.update({"maps": results, "count": qs_count})
return kwargs
@property
def per_page(self):
return settings.UMAP_MAPS_PER_SEARCH
search = Search.as_view()
class UserDashboard(PaginatorMixin, DetailView, SearchMixin):
model = User
template_name = "umap/user_dashboard.html"
list_template_name = "umap/map_table.html"
def get_object(self):
return self.get_queryset().get(pk=self.request.user.pk)
def get_maps(self):
qs = self.get_search_queryset() or Map.objects.all()
qs = qs.filter(owner=self.object).union(qs.filter(editors=self.object))
return qs.order_by("-modified_at")
def get_context_data(self, **kwargs):
kwargs.update(
{"maps": self.paginate(self.get_maps(), settings.UMAP_MAPS_PER_PAGE_OWNER)}
)
return super().get_context_data(**kwargs)
user_dashboard = UserDashboard.as_view()
class MapsShowCase(View):
def get(self, *args, **kwargs):
maps = Map.public.filter(center__distance_gt=(DEFAULT_CENTER, D(km=1)))
maps = maps.order_by("-modified_at")[:2500]
def make(m):
description = m.description or ""
if m.owner:
description = "{description}\n{by} [[{url}|{name}]]".format(
description=description,
by=_("by"),
url=m.owner.get_url(),
name=m.owner,
)
description = "{}\n[[{}|{}]]".format(
description, m.get_absolute_url(), _("View the map")
)
geometry = m.settings.get("geometry", json.loads(m.center.geojson))
return {
"type": "Feature",
"geometry": geometry,
"properties": {"name": m.name, "description": description},
}
geojson = {"type": "FeatureCollection", "features": [make(m) for m in maps]}
return HttpResponse(smart_bytes(json.dumps(geojson)))
showcase = MapsShowCase.as_view()
def validate_url(request):
assert request.method == "GET"
assert is_ajax(request)
url = request.GET.get("url")
assert url
try:
URLValidator(url)
except ValidationError:
raise AssertionError()
assert "HTTP_REFERER" in request.META
referer = urlparse(request.META.get("HTTP_REFERER"))
toproxy = urlparse(url)
local = urlparse(settings.SITE_URL)
assert toproxy.hostname
assert referer.hostname == local.hostname
assert toproxy.hostname != "localhost"
assert toproxy.netloc != local.netloc
try:
# clean this when in python 3.4
ipaddress = socket.gethostbyname(toproxy.hostname)
except:
raise AssertionError()
assert not PRIVATE_IP.match(ipaddress)
return url
class AjaxProxy(View):
def get(self, *args, **kwargs):
try:
url = validate_url(self.request)
except AssertionError:
return HttpResponseBadRequest()
try:
ttl = int(self.request.GET.get("ttl"))
except (TypeError, ValueError):
ttl = None
if getattr(settings, "UMAP_XSENDFILE_HEADER", None):
response = HttpResponse()
response[settings.UMAP_XSENDFILE_HEADER] = f"/proxy/{quote(url)}"
if ttl:
response["X-Accel-Expires"] = ttl
return response
# You should not use this in production (use Nginx or so)
headers = {"User-Agent": "uMapProxy +http://wiki.openstreetmap.org/wiki/UMap"}
request = Request(url, headers=headers)
opener = build_opener()
try:
proxied_request = opener.open(request, timeout=10)
except HTTPError as e:
return HttpResponse(e.msg, status=e.code, content_type="text/plain")
except URLError:
return HttpResponseBadRequest("URL error")
except InvalidURL:
return HttpResponseBadRequest("Invalid URL")
except TimeoutError:
return HttpResponseBadRequest("Timeout")
else:
status_code = proxied_request.code
content_type = proxied_request.headers.get("Content-Type")
if not content_type:
content_type, encoding = mimetypes.guess_type(url)
content = proxied_request.read()
# Quick hack to prevent Django from adding a Vary: Cookie header
self.request.session.accessed = False
response = HttpResponse(
content, status=status_code, content_type=content_type
)
if ttl:
response["X-Accel-Expires"] = ttl
return response
ajax_proxy = AjaxProxy.as_view()
# ############## #
# Utils #
# ############## #
def _urls_for_js(urls=None):
"""
Return templated URLs prepared for javascript.
"""
if urls is None:
# prevent circular import
from .urls import i18n_urls, urlpatterns
urls = [
url.name for url in urlpatterns + i18n_urls if getattr(url, "name", None)
]
urls = dict(zip(urls, [get_uri_template(url) for url in urls]))
urls.update(getattr(settings, "UMAP_EXTRA_URLS", {}))
return urls
def simple_json_response(**kwargs):
return HttpResponse(json.dumps(kwargs), content_type="application/json")
# ############## #
# Map #
# ############## #
class FormLessEditMixin:
http_method_names = [
"post",
]
def form_invalid(self, form):
return simple_json_response(errors=form.errors, error=str(form.errors))
def get_form(self, form_class=None):
kwargs = self.get_form_kwargs()
kwargs["error_class"] = FlatErrorList
return self.get_form_class()(**kwargs)
class MapDetailMixin:
model = Map
pk_url_kwarg = "map_id"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
user = self.request.user
properties = {
"urls": _urls_for_js(),
"tilelayers": TileLayer.get_list(),
"editMode": self.edit_mode,
"default_iconUrl": "%sumap/img/marker.png" % settings.STATIC_URL, # noqa
"umap_id": self.get_umap_id(),
"starred": self.is_starred(),
"licences": dict((l.name, l.json) for l in Licence.objects.all()),
"share_statuses": [
(i, str(label)) for i, label in Map.SHARE_STATUS if i != Map.BLOCKED
],
"umap_version": VERSION,
"featuresHaveOwner": settings.UMAP_DEFAULT_FEATURES_HAVE_OWNERS,
}
created = bool(getattr(self, "object", None))
if (created and self.object.owner) or (not created and not user.is_anonymous):
map_statuses = Map.EDIT_STATUS
datalayer_statuses = DataLayer.EDIT_STATUS
else:
map_statuses = AnonymousMapPermissionsForm.STATUS
datalayer_statuses = AnonymousDataLayerPermissionsForm.STATUS
properties["edit_statuses"] = [(i, str(label)) for i, label in map_statuses]
properties["datalayer_edit_statuses"] = [
(i, str(label)) for i, label in datalayer_statuses
]
if self.get_short_url():
properties["shortUrl"] = self.get_short_url()
if settings.USE_I18N:
lang = settings.LANGUAGE_CODE
# Check attr in case the middleware is not active
if hasattr(self.request, "LANGUAGE_CODE"):
lang = self.request.LANGUAGE_CODE
properties["lang"] = lang
locale = to_locale(lang)
properties["locale"] = locale
context["locale"] = locale
if not user.is_anonymous:
properties["user"] = {
"id": user.pk,
"name": str(user),
"url": reverse("user_dashboard"),
}
map_settings = self.get_geojson()
if "properties" not in map_settings:
map_settings["properties"] = {}
map_settings["properties"].update(properties)
map_settings["properties"]["datalayers"] = self.get_datalayers()
context["map_settings"] = json.dumps(map_settings, indent=settings.DEBUG)
return context
def get_datalayers(self):
return []
@property
def edit_mode(self):
return "advanced"
def get_umap_id(self):
return None
def is_starred(self):
return False
def get_geojson(self):
return {
"geometry": {
"coordinates": [DEFAULT_LONGITUDE, DEFAULT_LATITUDE],
"type": "Point",
},
"properties": {
"zoom": getattr(settings, "LEAFLET_ZOOM", 6),
"datalayers": [],
},
}
def get_short_url(self):
return None
class PermissionsMixin:
def get_permissions(self):
permissions = {}
permissions["edit_status"] = self.object.edit_status
permissions["share_status"] = self.object.share_status
if self.object.owner:
permissions["owner"] = {
"id": self.object.owner.pk,
"name": str(self.object.owner),
"url": self.object.owner.get_url(),
}
permissions["editors"] = [
{"id": editor.pk, "name": str(editor)}
for editor in self.object.editors.all()
]
if not self.object.owner and self.object.is_anonymous_owner(self.request):
permissions["anonymous_edit_url"] = self.object.get_anonymous_edit_url()
return permissions
class MapView(MapDetailMixin, PermissionsMixin, DetailView):
def get(self, request, *args, **kwargs):
self.object = self.get_object()
canonical = self.get_canonical_url()
if not request.path == canonical:
if request.META.get("QUERY_STRING"):
canonical = "?".join([canonical, request.META["QUERY_STRING"]])
return HttpResponsePermanentRedirect(canonical)
return super(MapView, self).get(request, *args, **kwargs)
def get_canonical_url(self):
return self.object.get_absolute_url()
def get_datalayers(self):
return [
l.metadata(self.request.user, self.request)
for l in self.object.datalayer_set.all()
]
@property
def edit_mode(self):
edit_mode = "disabled"
if self.object.can_edit(self.request.user, self.request):
edit_mode = "advanced"
elif any(
d.can_edit(self.request.user, self.request)
for d in self.object.datalayer_set.all()
):
edit_mode = "simple"
return edit_mode
def get_umap_id(self):
return self.object.pk
def get_short_url(self):
short_url = None
if getattr(settings, "SHORT_SITE_URL", None):
short_path = reverse_lazy("map_short_url", kwargs={"pk": self.object.pk})
short_url = "%s%s" % (settings.SHORT_SITE_URL, short_path)
return short_url
def get_geojson(self):
map_settings = self.object.settings
if "properties" not in map_settings:
map_settings["properties"] = {}
map_settings["properties"]["name"] = self.object.name
map_settings["properties"]["permissions"] = self.get_permissions()
return map_settings
def is_starred(self):
user = self.request.user
if not user.is_authenticated:
return False
return Star.objects.filter(by=user, map=self.object).exists()
class MapDownload(DetailView):
model = Map
pk_url_kwarg = "map_id"
def get_canonical_url(self):
return reverse("map_download", args=(self.object.pk,))
def render_to_response(self, context, *args, **kwargs):
geojson = self.object.settings
geojson["type"] = "umap"
geojson["uri"] = self.request.build_absolute_uri(self.object.get_absolute_url())
datalayers = []
for datalayer in self.object.datalayer_set.all():
with open(datalayer.geojson.path, "rb") as f:
layer = json.loads(f.read())
if datalayer.settings:
layer["_umap_options"] = datalayer.settings
datalayers.append(layer)
geojson["layers"] = datalayers
response = simple_json_response(**geojson)
response[
"Content-Disposition"
] = f'attachment; filename="umap_backup_{self.object.slug}.umap"'
return response
class MapViewGeoJSON(MapView):
def get_canonical_url(self):
return reverse("map_geojson", args=(self.object.pk,))
def render_to_response(self, context, *args, **kwargs):
return HttpResponse(context["map_settings"], content_type="application/json")
class MapNew(MapDetailMixin, TemplateView):
template_name = "umap/map_detail.html"
class MapCreate(FormLessEditMixin, PermissionsMixin, CreateView):
model = Map
form_class = MapSettingsForm
def form_valid(self, form):
if self.request.user.is_authenticated:
form.instance.owner = self.request.user
self.object = form.save()
permissions = self.get_permissions()
# User does not have the cookie yet.
if not self.object.owner:
anonymous_url = self.object.get_anonymous_edit_url()
permissions["anonymous_edit_url"] = anonymous_url
response = simple_json_response(
id=self.object.pk,
url=self.object.get_absolute_url(),
permissions=permissions,
)
if not self.request.user.is_authenticated:
key, value = self.object.signed_cookie_elements
response.set_signed_cookie(
key=key, value=value, max_age=ANONYMOUS_COOKIE_MAX_AGE
)
return response
class MapUpdate(FormLessEditMixin, PermissionsMixin, UpdateView):
model = Map
form_class = MapSettingsForm
pk_url_kwarg = "map_id"
def form_valid(self, form):
self.object.settings = form.cleaned_data["settings"]
self.object.save()
return simple_json_response(
id=self.object.pk,
url=self.object.get_absolute_url(),
permissions=self.get_permissions(),
info=_("Map has been updated!"),
)
class UpdateMapPermissions(FormLessEditMixin, UpdateView):
model = Map
pk_url_kwarg = "map_id"
def get_form_class(self):
if self.object.owner:
return UpdateMapPermissionsForm
else:
return AnonymousMapPermissionsForm
def get_form(self, form_class=None):
form = super().get_form(form_class)
user = self.request.user
if self.object.owner and not user == self.object.owner:
del form.fields["edit_status"]
del form.fields["share_status"]
del form.fields["owner"]
return form
def form_valid(self, form):
self.object = form.save()
return simple_json_response(info=_("Map editors updated with success!"))
class AttachAnonymousMap(View):
def post(self, *args, **kwargs):
self.object = kwargs["map_inst"]
if (
self.object.owner
or not self.object.is_anonymous_owner(self.request)
or not self.object.can_edit(self.request.user, self.request)
or not self.request.user.is_authenticated
):
return HttpResponseForbidden()
self.object.owner = self.request.user
self.object.save()
return simple_json_response()
class SendEditLink(FormLessEditMixin, FormView):
form_class = SendLinkForm
def post(self, form, **kwargs):
self.object = kwargs["map_inst"]
if (
self.object.owner
or not self.object.is_anonymous_owner(self.request)
or not self.object.can_edit(self.request.user, self.request)
):
return HttpResponseForbidden()
form = self.get_form()
if form.is_valid():
email = form.cleaned_data["email"]
else:
return HttpResponseBadRequest("Invalid")
link = self.object.get_anonymous_edit_url()
send_mail(
_(
"The uMap edit link for your map: %(map_name)s"
% {"map_name": self.object.name}
),
_("Here is your secret edit link: %(link)s" % {"link": link}),
settings.FROM_EMAIL,
[email],
fail_silently=False,
)
return simple_json_response(
info=_("Email sent to %(email)s" % {"email": email})
)
class MapDelete(DeleteView):
model = Map
pk_url_kwarg = "map_id"
def form_valid(self, form):
self.object = self.get_object()
if self.object.owner and self.request.user != self.object.owner:
return HttpResponseForbidden(_("Only its owner can delete the map."))
if not self.object.owner and not self.object.is_anonymous_owner(self.request):
return HttpResponseForbidden()
self.object.delete()
return simple_json_response(redirect="/")
class MapClone(PermissionsMixin, View):
def post(self, *args, **kwargs):
if (
not getattr(settings, "UMAP_ALLOW_ANONYMOUS", False)
and not self.request.user.is_authenticated
):
return HttpResponseForbidden()
owner = self.request.user if self.request.user.is_authenticated else None
self.object = kwargs["map_inst"].clone(owner=owner)
response = simple_json_response(redirect=self.object.get_absolute_url())
if not self.request.user.is_authenticated:
key, value = self.object.signed_cookie_elements
response.set_signed_cookie(
key=key, value=value, max_age=ANONYMOUS_COOKIE_MAX_AGE
)
msg = _(
"Your map has been cloned! If you want to edit this map from "
"another computer, please use this link: %(anonymous_url)s"
% {"anonymous_url": self.object.get_anonymous_edit_url()}
)
else:
msg = _("Congratulations, your map has been cloned!")
messages.info(self.request, msg)
return response
class ToggleMapStarStatus(View):
def post(self, *args, **kwargs):
map_inst = get_object_or_404(Map, pk=kwargs["map_id"])
qs = Star.objects.filter(map=map_inst, by=self.request.user)
if qs.exists():
qs.delete()
status = False
else:
Star.objects.create(map=map_inst, by=self.request.user)
status = True
return simple_json_response(starred=status)
class MapShortUrl(RedirectView):
query_string = True
permanent = True
def get_redirect_url(self, **kwargs):
map_inst = get_object_or_404(Map, pk=kwargs["pk"])
url = map_inst.get_absolute_url()
if self.query_string:
args = self.request.META.get("QUERY_STRING", "")
if args:
url = "%s?%s" % (url, args)
return url
class MapAnonymousEditUrl(RedirectView):
permanent = False
def get(self, request, *args, **kwargs):
signer = Signer()
try:
pk = signer.unsign(self.kwargs["signature"])
except BadSignature:
signer = Signer(algorithm="sha1")
try:
pk = signer.unsign(self.kwargs["signature"])
except BadSignature:
return HttpResponseForbidden()
map_inst = get_object_or_404(Map, pk=pk)
url = map_inst.get_absolute_url()
response = HttpResponseRedirect(url)
if not map_inst.owner:
key, value = map_inst.signed_cookie_elements
response.set_signed_cookie(
key=key, value=value, max_age=ANONYMOUS_COOKIE_MAX_AGE
)
return response
# ############## #
# DataLayer #
# ############## #
class GZipMixin(object):
EXT = ".gz"
@property
def path(self):
return self.object.geojson.path
@property
def gzip_path(self):
return Path(f"{self.path}{self.EXT}")
def compute_last_modified(self, path):
stat = os.stat(path)
return http_date(stat.st_mtime)
@property
def last_modified(self):
# Prior to 1.3.0 we did not set gzip mtime as geojson mtime,
# but we switched from If-Match header to IF-Unmodified-Since
# and when users accepts gzip their last modified value is the gzip
# (when umap is served by nginx and X-Accel-Redirect)
# one, so we need to compare with that value in that case.
# cf https://github.com/umap-project/umap/issues/1212
path = (
self.gzip_path
if self.accepts_gzip and self.gzip_path.exists()
else self.path
)
return self.compute_last_modified(path)
@property
def accepts_gzip(self):
return settings.UMAP_GZIP and re_accepts_gzip.search(
self.request.META.get("HTTP_ACCEPT_ENCODING", "")
)
class DataLayerView(GZipMixin, BaseDetailView):
model = DataLayer
def render_to_response(self, context, **response_kwargs):
response = None
path = self.path
# Generate gzip if needed
if self.accepts_gzip:
if not self.gzip_path.exists():
gzip_file(path, self.gzip_path)
if getattr(settings, "UMAP_XSENDFILE_HEADER", None):
response = HttpResponse()
path = path.replace(settings.MEDIA_ROOT, "/internal")
response[settings.UMAP_XSENDFILE_HEADER] = path
else:
# Do not use in production
# (no gzip/cache-control/If-Modified-Since/If-None-Match)
statobj = os.stat(path)
with open(path, "rb") as f:
# Should not be used in production!
response = HttpResponse(f.read(), content_type="application/geo+json")
response["Last-Modified"] = self.last_modified
response["Content-Length"] = statobj.st_size
return response
class DataLayerVersion(DataLayerView):
@property
def path(self):
return "{root}/{path}".format(
root=settings.MEDIA_ROOT,
path=self.object.get_version_path(self.kwargs["name"]),
)
class DataLayerCreate(FormLessEditMixin, GZipMixin, CreateView):
model = DataLayer
form_class = DataLayerForm
def form_valid(self, form):
form.instance.map = self.kwargs["map_inst"]
self.object = form.save()
# Simple response with only metadatas (including new id)
response = simple_json_response(
**self.object.metadata(self.request.user, self.request)
)
response["Last-Modified"] = self.last_modified
return response
class DataLayerUpdate(FormLessEditMixin, GZipMixin, UpdateView):
model = DataLayer
form_class = DataLayerForm
def has_been_modified_since(self, if_unmodified_since):
return if_unmodified_since and self.last_modified != if_unmodified_since
def merge(self, if_unmodified_since):
"""
Attempt to apply the incoming changes to the document the client was using, and
then merge it with the last document we have on storage.
Returns either None (if the merge failed) or the merged python GeoJSON object.
"""
# Use If-Modified-Since to find the correct version in our storage.
for name in self.object.get_versions():
path = os.path.join(settings.MEDIA_ROOT, self.object.get_version_path(name))
if if_unmodified_since == self.compute_last_modified(path):
with open(path) as f:
reference = json.loads(f.read())
break
else:
# If the document is not found, we can't merge.
return None
# New data received in the request.
entrant = json.loads(self.request.FILES["geojson"].read())
# Latest known version of the data.
with open(self.path) as f:
latest = json.loads(f.read())
try:
merged_features = merge_features(
reference["features"], latest["features"], entrant["features"]
)
latest["features"] = merged_features
return latest
except ConflictError:
return None
def post(self, request, *args, **kwargs):
self.object = self.get_object()
if self.object.map.pk != int(self.kwargs["map_id"]):
return HttpResponseForbidden()
if not self.object.can_edit(user=self.request.user, request=self.request):
return HttpResponseForbidden()
ius_header = self.request.META.get("HTTP_IF_UNMODIFIED_SINCE")
if self.has_been_modified_since(ius_header):
merged = self.merge(ius_header)
if not merged:
return HttpResponse(status=412)
# Replace the uploaded file by the merged version.
self.request.FILES["geojson"].file = BytesIO(
json.dumps(merged).encode("utf-8")
)
# Mark the data to be reloaded by form_valid
self.request.session["needs_reload"] = True
return super().post(request, *args, **kwargs)
def form_valid(self, form):
self.object = form.save()
data = {**self.object.metadata(self.request.user, self.request)}
if self.request.session.get("needs_reload"):
data["geojson"] = json.loads(self.object.geojson.read().decode())
self.request.session["needs_reload"] = False
response = simple_json_response(**data)
response["Last-Modified"] = self.last_modified
return response
class DataLayerDelete(DeleteView):
model = DataLayer
def form_valid(self, form):
self.object = self.get_object()
if self.object.map != self.kwargs["map_inst"]:
return HttpResponseForbidden()
self.object.delete()
return simple_json_response(info=_("Layer successfully deleted."))
class DataLayerVersions(BaseDetailView):
model = DataLayer
def render_to_response(self, context, **response_kwargs):
return simple_json_response(versions=self.object.versions)
class UpdateDataLayerPermissions(FormLessEditMixin, UpdateView):
model = DataLayer
pk_url_kwarg = "pk"
def get_form_class(self):
if self.object.map.owner:
return DataLayerPermissionsForm
else:
return AnonymousDataLayerPermissionsForm
def form_valid(self, form):
self.object = form.save()
return simple_json_response(info=_("Permissions updated with success!"))
# ############## #
# Picto #
# ############## #
class PictogramJSONList(ListView):
model = Pictogram
def render_to_response(self, context, **response_kwargs):
content = [p.json for p in Pictogram.objects.all()]
return simple_json_response(pictogram_list=content)
# ############## #
# Generic #
# ############## #
def stats(request):
last_week = make_aware(datetime.now()) - timedelta(days=7)
return simple_json_response(
**{
"version": VERSION,
"maps_count": Map.objects.count(),
"maps_active_last_week_count": Map.objects.filter(
modified_at__gt=last_week
).count(),
"users_count": User.objects.count(),
"users_active_last_week_count": User.objects.filter(
last_login__gt=last_week
).count(),
}
)
@require_GET
@cache_control(max_age=60 * 60 * 24, immutable=True, public=True) # One day.
def webmanifest(request):
return simple_json_response(
**{
"icons": [
{
"src": staticfiles_storage.url("umap/favicons/icon-192.png"),
"type": "image/png",
"sizes": "192x192",
},
{
"src": staticfiles_storage.url("umap/favicons/icon-512.png"),
"type": "image/png",
"sizes": "512x512",
},
]
}
)
def logout(request):
do_logout(request)
if is_ajax(request):
return simple_json_response(redirect="/")
return HttpResponseRedirect("/")
class LoginPopupEnd(TemplateView):
"""
End of a loggin process in popup.
Basically close the popup.
"""
template_name = "umap/login_popup_end.html"