From 73d19e849f3c869845e5edcfce63ae726ba84c61 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Fri, 22 Sep 2023 17:19:18 +0200 Subject: [PATCH] Refactor share_status check in datalayers views --- umap/decorators.py | 16 +++++++ umap/models.py | 2 +- umap/tests/conftest.py | 2 + umap/tests/test_datalayer_views.py | 72 +++++++++++++++++++++++++++--- umap/tests/test_map_views.py | 16 +++---- umap/urls.py | 18 +++++--- umap/views.py | 3 +- 7 files changed, 105 insertions(+), 24 deletions(-) diff --git a/umap/decorators.py b/umap/decorators.py index a0e13171..c096c1f4 100644 --- a/umap/decorators.py +++ b/umap/decorators.py @@ -47,6 +47,22 @@ def map_permissions_check(view_func): return wrapper +def can_view_map(view_func): + """ + Used for URLs dealing with viewing the map. + """ + + @wraps(view_func) + def wrapper(request, *args, **kwargs): + map_inst = get_object_or_404(Map, pk=kwargs["map_id"]) + kwargs["map_inst"] = map_inst # Avoid rerequesting the map in the view + if not map_inst.can_view(request): + return HttpResponseForbidden() + return view_func(request, *args, **kwargs) + + return wrapper + + def jsonize_view(view_func): @wraps(view_func) def wrapper(request, *args, **kwargs): diff --git a/umap/models.py b/umap/models.py index 1ce15418..7b804642 100644 --- a/umap/models.py +++ b/umap/models.py @@ -193,7 +193,7 @@ class Map(NamedModel): public = PublicManager() def get_absolute_url(self): - return reverse("map", kwargs={"slug": self.slug or "map", "pk": self.pk}) + return reverse("map", kwargs={"slug": self.slug or "map", "map_id": self.pk}) def get_anonymous_edit_url(self): signer = Signer() diff --git a/umap/tests/conftest.py b/umap/tests/conftest.py index a9a29726..6d168835 100644 --- a/umap/tests/conftest.py +++ b/umap/tests/conftest.py @@ -5,6 +5,8 @@ import pytest from django.core.cache import cache from django.core.signing import get_cookie_signer +from umap.models import Map + from .base import ( DataLayerFactory, MapFactory, diff --git a/umap/tests/test_datalayer_views.py b/umap/tests/test_datalayer_views.py index cf4b6b8d..43a6d49d 100644 --- a/umap/tests/test_datalayer_views.py +++ b/umap/tests/test_datalayer_views.py @@ -23,9 +23,12 @@ def post_data(): } -def test_get(client, settings, datalayer): - url = reverse("datalayer_view", args=(datalayer.pk,)) +def test_get_with_public_mode(client, settings, datalayer, map): + map.share_status = Map.PUBLIC + map.save() + url = reverse("datalayer_view", args=(map.pk, datalayer.pk)) response = client.get(url) + assert response.status_code == 200 assert response["Last-Modified"] is not None assert response["Cache-Control"] is not None assert "Content-Encoding" not in response @@ -35,8 +38,49 @@ def test_get(client, settings, datalayer): assert j["type"] == "FeatureCollection" +def test_get_with_open_mode(client, settings, datalayer, map): + map.share_status = Map.PUBLIC + map.save() + url = reverse("datalayer_view", args=(map.pk, datalayer.pk)) + response = client.get(url) + assert response.status_code == 200 + + +def test_get_with_blocked_mode(client, settings, datalayer, map): + map.share_status = Map.BLOCKED + map.save() + url = reverse("datalayer_view", args=(map.pk, datalayer.pk)) + response = client.get(url) + assert response.status_code == 403 + + +def test_cannot_get_datalayer_if_not_public(client, settings, datalayer, map): + map.share_status = Map.PRIVATE + map.save() + url = reverse("datalayer_view", args=(map.pk, datalayer.pk)) + response = client.get(url) + assert response.status_code == 403 + + +def test_cannot_get_datalayer_if_private_and_owner(client, settings, datalayer, map): + map.share_status = Map.PRIVATE + map.save() + client.login(username=map.owner.username, password="123123") + url = reverse( + "datalayer_view", + args=( + map.pk, + datalayer.pk, + ), + ) + response = client.get(url) + assert response.status_code == 200 + + def test_gzip_should_be_created_if_accepted(client, datalayer, map, post_data): - url = reverse("datalayer_view", args=(datalayer.pk,)) + map.share_status = Map.PUBLIC + map.save() + url = reverse("datalayer_view", args=(map.pk, datalayer.pk)) response = client.get(url, headers={"ACCEPT_ENCODING": "gzip"}) assert response.status_code == 200 flat = datalayer.geojson.path @@ -107,8 +151,10 @@ def test_should_not_be_possible_to_delete_with_wrong_map_id_in_url( def test_optimistic_concurrency_control_with_good_last_modified( client, datalayer, map, post_data ): + map.share_status = Map.PUBLIC + map.save() # Get Last-Modified - url = reverse("datalayer_view", args=(datalayer.pk,)) + url = reverse("datalayer_view", args=(map.pk, datalayer.pk)) response = client.get(url) last_modified = response["Last-Modified"] url = reverse("datalayer_update", args=(map.pk, datalayer.pk)) @@ -150,6 +196,8 @@ def test_optimistic_concurrency_control_with_empty_last_modified( def test_versions_should_return_versions(client, datalayer, map, settings): + map.share_status = Map.PUBLIC + map.save() root = datalayer.storage_root() datalayer.geojson.storage.save( "%s/%s_1440924889.geojson" % (root, datalayer.pk), ContentFile("{}") @@ -160,7 +208,7 @@ def test_versions_should_return_versions(client, datalayer, map, settings): datalayer.geojson.storage.save( "%s/%s_1440918637.geojson" % (root, datalayer.pk), ContentFile("{}") ) - url = reverse("datalayer_versions", args=(datalayer.pk,)) + url = reverse("datalayer_versions", args=(map.pk, datalayer.pk)) versions = json.loads(client.get(url).content.decode()) assert len(versions["versions"]) == 4 version = { @@ -172,13 +220,25 @@ def test_versions_should_return_versions(client, datalayer, map, settings): def test_version_should_return_one_version_geojson(client, datalayer, map): + map.share_status = Map.PUBLIC + map.save() root = datalayer.storage_root() name = "%s_1440924889.geojson" % datalayer.pk datalayer.geojson.storage.save("%s/%s" % (root, name), ContentFile("{}")) - url = reverse("datalayer_version", args=(datalayer.pk, name)) + url = reverse("datalayer_version", args=(map.pk, datalayer.pk, name)) assert client.get(url).content.decode() == "{}" +def test_version_should_return_403_if_not_allowed(client, datalayer, map): + map.share_status = Map.PRIVATE + map.save() + root = datalayer.storage_root() + name = "%s_1440924889.geojson" % datalayer.pk + datalayer.geojson.storage.save("%s/%s" % (root, name), ContentFile("{}")) + url = reverse("datalayer_version", args=(map.pk, datalayer.pk, name)) + assert client.get(url).status_code == 403 + + def test_update_readonly(client, datalayer, map, post_data, settings): settings.UMAP_READONLY = True url = reverse("datalayer_update", args=(map.pk, datalayer.pk)) diff --git a/umap/tests/test_map_views.py b/umap/tests/test_map_views.py index 59e7b92c..ab8519ab 100644 --- a/umap/tests/test_map_views.py +++ b/umap/tests/test_map_views.py @@ -119,17 +119,17 @@ def test_delete(client, map, datalayer): def test_wrong_slug_should_redirect_to_canonical(client, map): - url = reverse("map", kwargs={"pk": map.pk, "slug": "wrong-slug"}) - canonical = reverse("map", kwargs={"pk": map.pk, "slug": map.slug}) + url = reverse("map", kwargs={"map_id": map.pk, "slug": "wrong-slug"}) + canonical = reverse("map", kwargs={"map_id": map.pk, "slug": map.slug}) response = client.get(url) assert response.status_code == 301 assert response["Location"] == canonical def test_wrong_slug_should_redirect_with_query_string(client, map): - url = reverse("map", kwargs={"pk": map.pk, "slug": "wrong-slug"}) + url = reverse("map", kwargs={"map_id": map.pk, "slug": "wrong-slug"}) url = "{}?allowEdit=0".format(url) - canonical = reverse("map", kwargs={"pk": map.pk, "slug": map.slug}) + canonical = reverse("map", kwargs={"map_id": map.pk, "slug": map.slug}) canonical = "{}?allowEdit=0".format(canonical) response = client.get(url) assert response.status_code == 301 @@ -137,7 +137,7 @@ def test_wrong_slug_should_redirect_with_query_string(client, map): def test_should_not_consider_the_query_string_for_canonical_check(client, map): - url = reverse("map", kwargs={"pk": map.pk, "slug": map.slug}) + url = reverse("map", kwargs={"map_id": map.pk, "slug": map.slug}) url = "{}?allowEdit=0".format(url) response = client.get(url) assert response.status_code == 200 @@ -145,7 +145,7 @@ def test_should_not_consider_the_query_string_for_canonical_check(client, map): def test_short_url_should_redirect_to_canonical(client, map): url = reverse("map_short_url", kwargs={"pk": map.pk}) - canonical = reverse("map", kwargs={"pk": map.pk, "slug": map.slug}) + canonical = reverse("map", kwargs={"map_id": map.pk, "slug": map.slug}) response = client.get(url) assert response.status_code == 301 assert response["Location"] == canonical @@ -390,7 +390,7 @@ def test_no_cookie_cant_delete(client, anonymap): @pytest.mark.usefixtures("allow_anonymous") def test_anonymous_edit_url(cookieclient, anonymap): url = anonymap.get_anonymous_edit_url() - canonical = reverse("map", kwargs={"pk": anonymap.pk, "slug": anonymap.slug}) + canonical = reverse("map", kwargs={"map_id": anonymap.pk, "slug": anonymap.slug}) response = cookieclient.get(url) assert response.status_code == 302 assert response["Location"] == canonical @@ -403,7 +403,7 @@ def test_sha1_anonymous_edit_url(cookieclient, anonymap): signer = Signer(algorithm="sha1") signature = signer.sign(anonymap.pk) url = reverse("map_anonymous_edit_url", kwargs={"signature": signature}) - canonical = reverse("map", kwargs={"pk": anonymap.pk, "slug": anonymap.slug}) + canonical = reverse("map", kwargs={"map_id": anonymap.pk, "slug": anonymap.slug}) response = cookieclient.get(url) assert response.status_code == 302 assert response["Location"] == canonical diff --git a/umap/urls.py b/umap/urls.py index ed5dec3c..a44aa11d 100644 --- a/umap/urls.py +++ b/umap/urls.py @@ -14,6 +14,7 @@ from .decorators import ( jsonize_view, login_required_if_not_anonymous_allowed, map_permissions_check, + can_view_map, ) from .utils import decorated_patterns @@ -47,7 +48,7 @@ i18n_urls = [ ), re_path(r"^logout/$", views.logout, name="logout"), re_path( - r"^map/(?P\d+)/geojson/$", + r"^map/(?P\d+)/geojson/$", views.MapViewGeoJSON.as_view(), name="map_geojson", ), @@ -63,28 +64,31 @@ i18n_urls = [ ), ] i18n_urls += decorated_patterns( - cache_control(must_revalidate=True), + [can_view_map, cache_control(must_revalidate=True)], re_path( - r"^datalayer/(?P[\d]+)/$", + r"^datalayer/(?P\d+)/(?P[\d]+)/$", views.DataLayerView.as_view(), name="datalayer_view", ), re_path( - r"^datalayer/(?P[\d]+)/versions/$", + r"^datalayer/(?P\d+)/(?P[\d]+)/versions/$", views.DataLayerVersions.as_view(), name="datalayer_versions", ), re_path( - r"^datalayer/(?P[\d]+)/(?P[_\w]+.geojson)$", + r"^datalayer/(?P\d+)/(?P[\d]+)/(?P[_\w]+.geojson)$", views.DataLayerVersion.as_view(), name="datalayer_version", ), ) i18n_urls += decorated_patterns( - [ensure_csrf_cookie], + [can_view_map, ensure_csrf_cookie], re_path( - r"^map/(?P[-_\w]+)_(?P\d+)$", views.MapView.as_view(), name="map" + r"^map/(?P[-_\w]+)_(?P\d+)$", views.MapView.as_view(), name="map" ), +) +i18n_urls += decorated_patterns( + [ensure_csrf_cookie], re_path(r"^map/new/$", views.MapNew.as_view(), name="map_new"), ) i18n_urls += decorated_patterns( diff --git a/umap/views.py b/umap/views.py index c42a5ead..67d899b4 100644 --- a/umap/views.py +++ b/umap/views.py @@ -441,6 +441,7 @@ class FormLessEditMixin: class MapDetailMixin: model = Map + pk_url_kwarg = "map_id" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) @@ -544,8 +545,6 @@ class MapView(MapDetailMixin, PermissionsMixin, DetailView): if request.META.get("QUERY_STRING"): canonical = "?".join([canonical, request.META["QUERY_STRING"]]) return HttpResponsePermanentRedirect(canonical) - if not self.object.can_view(request): - return HttpResponseForbidden() return super(MapView, self).get(request, *args, **kwargs) def get_canonical_url(self):