Refactor share_status check in datalayers views

This commit is contained in:
Yohan Boniface 2023-09-22 17:19:18 +02:00
parent 9bcc18b790
commit 73d19e849f
7 changed files with 105 additions and 24 deletions

View file

@ -47,6 +47,22 @@ def map_permissions_check(view_func):
return wrapper
def can_view_map(view_func):
"""
Used for URLs dealing with viewing the map.
"""
@wraps(view_func)
def wrapper(request, *args, **kwargs):
map_inst = get_object_or_404(Map, pk=kwargs["map_id"])
kwargs["map_inst"] = map_inst # Avoid rerequesting the map in the view
if not map_inst.can_view(request):
return HttpResponseForbidden()
return view_func(request, *args, **kwargs)
return wrapper
def jsonize_view(view_func):
@wraps(view_func)
def wrapper(request, *args, **kwargs):

View file

@ -193,7 +193,7 @@ class Map(NamedModel):
public = PublicManager()
def get_absolute_url(self):
return reverse("map", kwargs={"slug": self.slug or "map", "pk": self.pk})
return reverse("map", kwargs={"slug": self.slug or "map", "map_id": self.pk})
def get_anonymous_edit_url(self):
signer = Signer()

View file

@ -5,6 +5,8 @@ import pytest
from django.core.cache import cache
from django.core.signing import get_cookie_signer
from umap.models import Map
from .base import (
DataLayerFactory,
MapFactory,

View file

@ -23,9 +23,12 @@ def post_data():
}
def test_get(client, settings, datalayer):
url = reverse("datalayer_view", args=(datalayer.pk,))
def test_get_with_public_mode(client, settings, datalayer, map):
map.share_status = Map.PUBLIC
map.save()
url = reverse("datalayer_view", args=(map.pk, datalayer.pk))
response = client.get(url)
assert response.status_code == 200
assert response["Last-Modified"] is not None
assert response["Cache-Control"] is not None
assert "Content-Encoding" not in response
@ -35,8 +38,49 @@ def test_get(client, settings, datalayer):
assert j["type"] == "FeatureCollection"
def test_get_with_open_mode(client, settings, datalayer, map):
map.share_status = Map.PUBLIC
map.save()
url = reverse("datalayer_view", args=(map.pk, datalayer.pk))
response = client.get(url)
assert response.status_code == 200
def test_get_with_blocked_mode(client, settings, datalayer, map):
map.share_status = Map.BLOCKED
map.save()
url = reverse("datalayer_view", args=(map.pk, datalayer.pk))
response = client.get(url)
assert response.status_code == 403
def test_cannot_get_datalayer_if_not_public(client, settings, datalayer, map):
map.share_status = Map.PRIVATE
map.save()
url = reverse("datalayer_view", args=(map.pk, datalayer.pk))
response = client.get(url)
assert response.status_code == 403
def test_cannot_get_datalayer_if_private_and_owner(client, settings, datalayer, map):
map.share_status = Map.PRIVATE
map.save()
client.login(username=map.owner.username, password="123123")
url = reverse(
"datalayer_view",
args=(
map.pk,
datalayer.pk,
),
)
response = client.get(url)
assert response.status_code == 200
def test_gzip_should_be_created_if_accepted(client, datalayer, map, post_data):
url = reverse("datalayer_view", args=(datalayer.pk,))
map.share_status = Map.PUBLIC
map.save()
url = reverse("datalayer_view", args=(map.pk, datalayer.pk))
response = client.get(url, headers={"ACCEPT_ENCODING": "gzip"})
assert response.status_code == 200
flat = datalayer.geojson.path
@ -107,8 +151,10 @@ def test_should_not_be_possible_to_delete_with_wrong_map_id_in_url(
def test_optimistic_concurrency_control_with_good_last_modified(
client, datalayer, map, post_data
):
map.share_status = Map.PUBLIC
map.save()
# Get Last-Modified
url = reverse("datalayer_view", args=(datalayer.pk,))
url = reverse("datalayer_view", args=(map.pk, datalayer.pk))
response = client.get(url)
last_modified = response["Last-Modified"]
url = reverse("datalayer_update", args=(map.pk, datalayer.pk))
@ -150,6 +196,8 @@ def test_optimistic_concurrency_control_with_empty_last_modified(
def test_versions_should_return_versions(client, datalayer, map, settings):
map.share_status = Map.PUBLIC
map.save()
root = datalayer.storage_root()
datalayer.geojson.storage.save(
"%s/%s_1440924889.geojson" % (root, datalayer.pk), ContentFile("{}")
@ -160,7 +208,7 @@ def test_versions_should_return_versions(client, datalayer, map, settings):
datalayer.geojson.storage.save(
"%s/%s_1440918637.geojson" % (root, datalayer.pk), ContentFile("{}")
)
url = reverse("datalayer_versions", args=(datalayer.pk,))
url = reverse("datalayer_versions", args=(map.pk, datalayer.pk))
versions = json.loads(client.get(url).content.decode())
assert len(versions["versions"]) == 4
version = {
@ -172,13 +220,25 @@ def test_versions_should_return_versions(client, datalayer, map, settings):
def test_version_should_return_one_version_geojson(client, datalayer, map):
map.share_status = Map.PUBLIC
map.save()
root = datalayer.storage_root()
name = "%s_1440924889.geojson" % datalayer.pk
datalayer.geojson.storage.save("%s/%s" % (root, name), ContentFile("{}"))
url = reverse("datalayer_version", args=(datalayer.pk, name))
url = reverse("datalayer_version", args=(map.pk, datalayer.pk, name))
assert client.get(url).content.decode() == "{}"
def test_version_should_return_403_if_not_allowed(client, datalayer, map):
map.share_status = Map.PRIVATE
map.save()
root = datalayer.storage_root()
name = "%s_1440924889.geojson" % datalayer.pk
datalayer.geojson.storage.save("%s/%s" % (root, name), ContentFile("{}"))
url = reverse("datalayer_version", args=(map.pk, datalayer.pk, name))
assert client.get(url).status_code == 403
def test_update_readonly(client, datalayer, map, post_data, settings):
settings.UMAP_READONLY = True
url = reverse("datalayer_update", args=(map.pk, datalayer.pk))

View file

@ -119,17 +119,17 @@ def test_delete(client, map, datalayer):
def test_wrong_slug_should_redirect_to_canonical(client, map):
url = reverse("map", kwargs={"pk": map.pk, "slug": "wrong-slug"})
canonical = reverse("map", kwargs={"pk": map.pk, "slug": map.slug})
url = reverse("map", kwargs={"map_id": map.pk, "slug": "wrong-slug"})
canonical = reverse("map", kwargs={"map_id": map.pk, "slug": map.slug})
response = client.get(url)
assert response.status_code == 301
assert response["Location"] == canonical
def test_wrong_slug_should_redirect_with_query_string(client, map):
url = reverse("map", kwargs={"pk": map.pk, "slug": "wrong-slug"})
url = reverse("map", kwargs={"map_id": map.pk, "slug": "wrong-slug"})
url = "{}?allowEdit=0".format(url)
canonical = reverse("map", kwargs={"pk": map.pk, "slug": map.slug})
canonical = reverse("map", kwargs={"map_id": map.pk, "slug": map.slug})
canonical = "{}?allowEdit=0".format(canonical)
response = client.get(url)
assert response.status_code == 301
@ -137,7 +137,7 @@ def test_wrong_slug_should_redirect_with_query_string(client, map):
def test_should_not_consider_the_query_string_for_canonical_check(client, map):
url = reverse("map", kwargs={"pk": map.pk, "slug": map.slug})
url = reverse("map", kwargs={"map_id": map.pk, "slug": map.slug})
url = "{}?allowEdit=0".format(url)
response = client.get(url)
assert response.status_code == 200
@ -145,7 +145,7 @@ def test_should_not_consider_the_query_string_for_canonical_check(client, map):
def test_short_url_should_redirect_to_canonical(client, map):
url = reverse("map_short_url", kwargs={"pk": map.pk})
canonical = reverse("map", kwargs={"pk": map.pk, "slug": map.slug})
canonical = reverse("map", kwargs={"map_id": map.pk, "slug": map.slug})
response = client.get(url)
assert response.status_code == 301
assert response["Location"] == canonical
@ -390,7 +390,7 @@ def test_no_cookie_cant_delete(client, anonymap):
@pytest.mark.usefixtures("allow_anonymous")
def test_anonymous_edit_url(cookieclient, anonymap):
url = anonymap.get_anonymous_edit_url()
canonical = reverse("map", kwargs={"pk": anonymap.pk, "slug": anonymap.slug})
canonical = reverse("map", kwargs={"map_id": anonymap.pk, "slug": anonymap.slug})
response = cookieclient.get(url)
assert response.status_code == 302
assert response["Location"] == canonical
@ -403,7 +403,7 @@ def test_sha1_anonymous_edit_url(cookieclient, anonymap):
signer = Signer(algorithm="sha1")
signature = signer.sign(anonymap.pk)
url = reverse("map_anonymous_edit_url", kwargs={"signature": signature})
canonical = reverse("map", kwargs={"pk": anonymap.pk, "slug": anonymap.slug})
canonical = reverse("map", kwargs={"map_id": anonymap.pk, "slug": anonymap.slug})
response = cookieclient.get(url)
assert response.status_code == 302
assert response["Location"] == canonical

View file

@ -14,6 +14,7 @@ from .decorators import (
jsonize_view,
login_required_if_not_anonymous_allowed,
map_permissions_check,
can_view_map,
)
from .utils import decorated_patterns
@ -47,7 +48,7 @@ i18n_urls = [
),
re_path(r"^logout/$", views.logout, name="logout"),
re_path(
r"^map/(?P<pk>\d+)/geojson/$",
r"^map/(?P<map_id>\d+)/geojson/$",
views.MapViewGeoJSON.as_view(),
name="map_geojson",
),
@ -63,28 +64,31 @@ i18n_urls = [
),
]
i18n_urls += decorated_patterns(
cache_control(must_revalidate=True),
[can_view_map, cache_control(must_revalidate=True)],
re_path(
r"^datalayer/(?P<pk>[\d]+)/$",
r"^datalayer/(?P<map_id>\d+)/(?P<pk>[\d]+)/$",
views.DataLayerView.as_view(),
name="datalayer_view",
),
re_path(
r"^datalayer/(?P<pk>[\d]+)/versions/$",
r"^datalayer/(?P<map_id>\d+)/(?P<pk>[\d]+)/versions/$",
views.DataLayerVersions.as_view(),
name="datalayer_versions",
),
re_path(
r"^datalayer/(?P<pk>[\d]+)/(?P<name>[_\w]+.geojson)$",
r"^datalayer/(?P<map_id>\d+)/(?P<pk>[\d]+)/(?P<name>[_\w]+.geojson)$",
views.DataLayerVersion.as_view(),
name="datalayer_version",
),
)
i18n_urls += decorated_patterns(
[ensure_csrf_cookie],
[can_view_map, ensure_csrf_cookie],
re_path(
r"^map/(?P<slug>[-_\w]+)_(?P<pk>\d+)$", views.MapView.as_view(), name="map"
r"^map/(?P<slug>[-_\w]+)_(?P<map_id>\d+)$", views.MapView.as_view(), name="map"
),
)
i18n_urls += decorated_patterns(
[ensure_csrf_cookie],
re_path(r"^map/new/$", views.MapNew.as_view(), name="map_new"),
)
i18n_urls += decorated_patterns(

View file

@ -441,6 +441,7 @@ class FormLessEditMixin:
class MapDetailMixin:
model = Map
pk_url_kwarg = "map_id"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
@ -544,8 +545,6 @@ class MapView(MapDetailMixin, PermissionsMixin, DetailView):
if request.META.get("QUERY_STRING"):
canonical = "?".join([canonical, request.META["QUERY_STRING"]])
return HttpResponsePermanentRedirect(canonical)
if not self.object.can_view(request):
return HttpResponseForbidden()
return super(MapView, self).get(request, *args, **kwargs)
def get_canonical_url(self):