[PATCH 1 of 3 v3] admin: users: factorize check for default user

Thomas De Schampheleire patrickdepinguin at gmail.com
Wed Jun 3 16:05:53 EDT 2015


# HG changeset patch
# User Thomas De Schampheleire <thomas.de.schampheleire at gmail.com>
# Date 1433359538 -7200
#      Wed Jun 03 21:25:38 2015 +0200
# Node ID 6a6119935ed636def910c4c4495ac2e4213d9ae7
# Parent  6e8effd028bf41a132aee02e52ffc0bf990dadf4
admin: users: factorize check for default user

Note that one specific unittest has been commented because it relies on
pytest features (monkeypatch). When pytest is the default test runner, the
test should be uncommented.

diff --git a/kallithea/controllers/admin/users.py b/kallithea/controllers/admin/users.py
--- a/kallithea/controllers/admin/users.py
+++ b/kallithea/controllers/admin/users.py
@@ -34,6 +34,7 @@ from pylons import request, tmpl_context
 from pylons.controllers.util import redirect
 from pylons.i18n.translation import _
 from sqlalchemy.sql.expression import func
+from webob.exc import HTTPNotFound
 
 import kallithea
 from kallithea.lib.exceptions import DefaultUserException, \
@@ -233,14 +234,17 @@ class UsersController(BaseController):
         # url('user', id=ID)
         User.get_or_404(-1)
 
+    def _get_user_or_raise_if_default(self, id):
+        try:
+            return User.get_or_404(id, allow_default=False)
+        except DefaultUserException:
+            h.flash(_("The default user cannot be edited"), category='warning')
+            raise HTTPNotFound
+
     def edit(self, id, format='html'):
         """GET /users/id/edit: Form to edit an existing item"""
         # url('edit_user', id=ID)
-        c.user = User.get_or_404(id)
-        if c.user.username == User.DEFAULT_USER:
-            h.flash(_("You can't edit this user"), category='warning')
-            return redirect(url('users'))
-
+        c.user = self._get_user_or_raise_if_default(id)
         c.active = 'profile'
         c.extern_type = c.user.extern_type
         c.extern_name = c.user.extern_name
@@ -254,11 +258,7 @@ class UsersController(BaseController):
             force_defaults=False)
 
     def edit_advanced(self, id):
-        c.user = User.get_or_404(id)
-        if c.user.username == User.DEFAULT_USER:
-            h.flash(_("You can't edit this user"), category='warning')
-            return redirect(url('users'))
-
+        c.user = self._get_user_or_raise_if_default(id)
         c.active = 'advanced'
         c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr)
 
@@ -277,11 +277,7 @@ class UsersController(BaseController):
             force_defaults=False)
 
     def edit_api_keys(self, id):
-        c.user = User.get_or_404(id)
-        if c.user.username == User.DEFAULT_USER:
-            h.flash(_("You can't edit this user"), category='warning')
-            return redirect(url('users'))
-
+        c.user = self._get_user_or_raise_if_default(id)
         c.active = 'api_keys'
         show_expired = True
         c.lifetime_values = [
@@ -302,10 +298,7 @@ class UsersController(BaseController):
             force_defaults=False)
 
     def add_api_key(self, id):
-        c.user = User.get_or_404(id)
-        if c.user.username == User.DEFAULT_USER:
-            h.flash(_("You can't edit this user"), category='warning')
-            return redirect(url('users'))
+        c.user = self._get_user_or_raise_if_default(id)
 
         lifetime = safe_int(request.POST.get('lifetime'), -1)
         description = request.POST.get('description')
@@ -315,10 +308,7 @@ class UsersController(BaseController):
         return redirect(url('edit_user_api_keys', id=c.user.user_id))
 
     def delete_api_key(self, id):
-        c.user = User.get_or_404(id)
-        if c.user.username == User.DEFAULT_USER:
-            h.flash(_("You can't edit this user"), category='warning')
-            return redirect(url('users'))
+        c.user = self._get_user_or_raise_if_default(id)
 
         api_key = request.POST.get('del_api_key')
         if request.POST.get('del_api_key_builtin'):
@@ -339,11 +329,7 @@ class UsersController(BaseController):
         pass
 
     def edit_perms(self, id):
-        c.user = User.get_or_404(id)
-        if c.user.username == User.DEFAULT_USER:
-            h.flash(_("You can't edit this user"), category='warning')
-            return redirect(url('users'))
-
+        c.user = self._get_user_or_raise_if_default(id)
         c.active = 'perms'
         c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr)
 
@@ -402,11 +388,7 @@ class UsersController(BaseController):
         return redirect(url('edit_user_perms', id=id))
 
     def edit_emails(self, id):
-        c.user = User.get_or_404(id)
-        if c.user.username == User.DEFAULT_USER:
-            h.flash(_("You can't edit this user"), category='warning')
-            return redirect(url('users'))
-
+        c.user = self._get_user_or_raise_if_default(id)
         c.active = 'emails'
         c.user_email_map = UserEmailMap.query()\
             .filter(UserEmailMap.user == c.user).all()
@@ -449,11 +431,7 @@ class UsersController(BaseController):
         return redirect(url('edit_user_emails', id=id))
 
     def edit_ips(self, id):
-        c.user = User.get_or_404(id)
-        if c.user.username == User.DEFAULT_USER:
-            h.flash(_("You can't edit this user"), category='warning')
-            return redirect(url('users'))
-
+        c.user = self._get_user_or_raise_if_default(id)
         c.active = 'ips'
         c.user_ip_map = UserIpMap.query()\
             .filter(UserIpMap.user == c.user).all()
diff --git a/kallithea/lib/exceptions.py b/kallithea/lib/exceptions.py
--- a/kallithea/lib/exceptions.py
+++ b/kallithea/lib/exceptions.py
@@ -45,6 +45,7 @@ class LdapImportError(Exception):
 
 
 class DefaultUserException(Exception):
+    """An invalid action was attempted on the default user"""
     pass
 
 
diff --git a/kallithea/model/db.py b/kallithea/model/db.py
--- a/kallithea/model/db.py
+++ b/kallithea/model/db.py
@@ -43,6 +43,7 @@ from webob.exc import HTTPNotFound
 from pylons.i18n.translation import lazy_ugettext as _
 
 from kallithea import DB_PREFIX
+from kallithea.lib.exceptions import DefaultUserException
 from kallithea.lib.vcs import get_backend
 from kallithea.lib.vcs.utils.helpers import get_scm
 from kallithea.lib.vcs.exceptions import VCSError
@@ -526,6 +527,18 @@ class User(Base, BaseModel):
                                       self.user_id, self.username)
 
     @classmethod
+    def get_or_404(cls, id_, allow_default=True):
+        '''
+        Overridden version of BaseModel.get_or_404, with an extra check on
+        the default user.
+        '''
+        user = super(User, cls).get_or_404(id_)
+        if allow_default == False:
+            if user.username == User.DEFAULT_USER:
+                raise DefaultUserException
+        return user
+
+    @classmethod
     def get_by_username(cls, username, case_insensitive=False, cache=False):
         if case_insensitive:
             q = cls.query().filter(cls.username.ilike(username))
diff --git a/kallithea/tests/functional/test_admin_users.py b/kallithea/tests/functional/test_admin_users.py
--- a/kallithea/tests/functional/test_admin_users.py
+++ b/kallithea/tests/functional/test_admin_users.py
@@ -13,6 +13,7 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 from sqlalchemy.orm.exc import NoResultFound
+from webob.exc import HTTPNotFound
 
 from kallithea.tests import *
 from kallithea.tests.fixture import Fixture
@@ -497,3 +498,81 @@ class TestAdminUsersController(TestContr
         self.checkSessionFlash(response, 'API key successfully reset')
         response = response.follow()
         response.mustcontain(no=[api_key])
+
+# TODO To be uncommented when pytest is the test runner
+#import pytest
+#from kallithea.controllers.admin.users import UsersController
+#class TestAdminUsersController_unittest(object):
+#    """
+#    Unit tests for the users controller
+#    These are in a separate class, not deriving from TestController (and thus
+#    unittest.TestCase), to be able to benefit from pytest features like
+#    monkeypatch.
+#    """
+#    def test_get_user_or_raise_if_default(self, monkeypatch):
+#        # flash complains about an unexisting session
+#        def flash_mock(*args, **kwargs):
+#            pass
+#        monkeypatch.setattr(h, 'flash', flash_mock)
+#
+#        u = UsersController()
+#        # a regular user should work correctly
+#        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
+#        assert u._get_user_or_raise_if_default(user.user_id) == user
+#        # the default user should raise
+#        with pytest.raises(HTTPNotFound):
+#            u._get_user_or_raise_if_default(User.get_default_user().user_id)
+
+
+class TestAdminUsersControllerForDefaultUser(TestController):
+    """
+    Edit actions on the default user are not allowed.
+    Validate that they throw a 404 exception.
+    """
+    def test_edit_default_user(self):
+        self.log_user()
+        user = User.get_default_user()
+        response = self.app.get(url('edit_user', id=user.user_id), status=404)
+
+    def test_edit_advanced_default_user(self):
+        self.log_user()
+        user = User.get_default_user()
+        response = self.app.get(url('edit_user_advanced', id=user.user_id), status=404)
+
+    # API keys
+    def test_edit_api_keys_default_user(self):
+        self.log_user()
+        user = User.get_default_user()
+        response = self.app.get(url('edit_user_api_keys', id=user.user_id), status=404)
+
+    def test_add_api_keys_default_user(self):
+        self.log_user()
+        user = User.get_default_user()
+        response = self.app.post(url('edit_user_api_keys', id=user.user_id),
+                 {'_method': 'put', '_authentication_token': self.authentication_token()}, status=404)
+
+    def test_delete_api_keys_default_user(self):
+        self.log_user()
+        user = User.get_default_user()
+        response = self.app.post(url('edit_user_api_keys', id=user.user_id),
+                 {'_method': 'delete', '_authentication_token': self.authentication_token()}, status=404)
+
+    # Permissions
+    def test_edit_perms_default_user(self):
+        self.log_user()
+        user = User.get_default_user()
+        response = self.app.get(url('edit_user_perms', id=user.user_id), status=404)
+
+    # E-mails
+    def test_edit_emails_default_user(self):
+        self.log_user()
+        user = User.get_default_user()
+        response = self.app.get(url('edit_user_emails', id=user.user_id), status=404)
+
+    # IP addresses
+    # Add/delete of IP addresses for the default user is used to maintain
+    # the global IP whitelist and thus allowed. Only 'edit' is forbidden.
+    def test_edit_ip_default_user(self):
+        self.log_user()
+        user = User.get_default_user()
+        response = self.app.get(url('edit_user_ips', id=user.user_id), status=404)


More information about the kallithea-general mailing list