Module: check_mk
Branch: master
Commit: a957ffa693e0c289d03bb306ff919a125557eb73
URL:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=a957ffa693e0c2…
Author: Lars Michelsen <lm(a)mathias-kettner.de>
Date: Mon Oct 22 09:22:37 2018 +0200
Centralized htpasswd file operations
* Encapsulated htpasswd file IO and line parsing to dedicated helper class
* Added tests for this helper class
CMK-1151
Change-Id: Iebf3e3644ee2a4cc462728766e6e3ddec0f92768
---
cmk/gui/plugins/userdb/htpasswd.py | 63 ++++++++++++++++------
cmk/gui/userdb.py | 11 +---
.../unit/cmk/gui/test_userdb_htpasswd_connector.py | 42 +++++++++++++++
3 files changed, 90 insertions(+), 26 deletions(-)
diff --git a/cmk/gui/plugins/userdb/htpasswd.py b/cmk/gui/plugins/userdb/htpasswd.py
index 28cb8af..a5ca02e 100644
--- a/cmk/gui/plugins/userdb/htpasswd.py
+++ b/cmk/gui/plugins/userdb/htpasswd.py
@@ -27,6 +27,8 @@
import time
import crypt
import os
+import pathlib2 as pathlib
+from typing import Dict, Text # pylint: disable=unused-import
import cmk.store as store
import cmk.paths
@@ -36,6 +38,45 @@ from cmk.gui.i18n import _
from cmk.gui.exceptions import MKUserError
from . import UserConnector, user_connector_registry
+class Htpasswd(object):
+ """Thin wrapper for loading and saving the htpasswd
file"""
+
+ def __init__(self, path):
+ # type: (pathlib.Path) -> None
+ super(Htpasswd, self).__init__()
+ self._path = path
+
+
+ def load(self):
+ # type: (None) -> Dict[Text, Text]
+ """Loads the contents of a valid htpasswd file into a dictionary
and returns the dictionary"""
+ entries = {}
+
+ with self._path.open(encoding="utf-8") as f:
+ for l in f:
+ if ':' not in l:
+ continue
+
+ user_id, pw_hash = l.split(':', 1)
+ entries[user_id] = pw_hash.rstrip('\n')
+
+ return entries
+
+
+ def exists(self, user_id):
+ """Whether or not a user exists according to the htpasswd
file"""
+ # type: (Text) -> bool
+ return user_id in self.load()
+
+
+ def save(self, entries):
+ # type: (Dict[Text, Text]) -> None
+ """Save the dictionary entries (unicode username and hash) to the
htpasswd file"""
+ output = "\n".join("%s:%s" % entry for entry in
sorted(entries.iteritems())) + "\n"
+ store.save_file("%s" % self._path, output.encode("utf-8"))
+
+
+
def encrypt_password(password, salt=None, prefix="1"):
if not salt:
salt = "%06d" % (1000000 * (time.time() % 1.0))
@@ -64,7 +105,7 @@ class HtpasswdUserConnector(UserConnector):
#
def check_credentials(self, user_id, password):
- users = self.load_htpasswd()
+ users = Htpasswd(pathlib.Path(cmk.paths.htpasswd_file)).load()
if user_id not in users:
return None # not existing user, skip over
@@ -80,19 +121,6 @@ class HtpasswdUserConnector(UserConnector):
return os.path.isfile(cmk.paths.var_dir + "/web/" +
user_id.encode("utf-8") + "/automation.secret")
- # Loads the contents of a valid htpasswd file into a dictionary
- # and returns the dictionary
- def load_htpasswd(self):
- creds = {}
-
- for line in open(cmk.paths.htpasswd_file, 'r'):
- if ':' in line:
- user_id, pwhash = line.split(':', 1)
- creds[user_id.decode("utf-8")] = pwhash.rstrip('\n')
-
- return creds
-
-
# Validate hashes taken from the htpasswd file. This method handles
# crypt() and md5 hashes. This should be the common cases in the
# used htpasswd files.
@@ -109,7 +137,7 @@ class HtpasswdUserConnector(UserConnector):
# users from htpasswd are lost. If you start managing users with
# WATO, you should continue to do so or stop doing to for ever...
# Locked accounts get a '!' before their password. This disable it.
- output = ""
+ creds = {}
for uid, user in users.items():
# only process users which are handled by htpasswd connector
@@ -121,6 +149,7 @@ class HtpasswdUserConnector(UserConnector):
locksym = '!'
else:
locksym = ""
- output += "%s:%s%s\n" % (uid.encode("utf-8"),
locksym, user["password"])
- store.save_file(cmk.paths.htpasswd_file, output)
+ creds[uid] = "%s%s" % (locksym, user["password"])
+
+ Htpasswd(cmk.paths.htpasswd_file).save(creds)
diff --git a/cmk/gui/userdb.py b/cmk/gui/userdb.py
index 8f7317a..36fac8d 100644
--- a/cmk/gui/userdb.py
+++ b/cmk/gui/userdb.py
@@ -49,6 +49,7 @@ from cmk.gui.i18n import _
from cmk.gui.globals import html
import cmk.gui.plugin_registry
import cmk.gui.plugins.userdb
+from cmk.gui.plugins.userdb.htpasswd import Htpasswd
from cmk.gui.plugins.userdb.utils import (
user_attribute_registry,
@@ -240,7 +241,7 @@ def user_exists(username):
if _user_exists_according_to_profile(username):
return True
- return _user_exists_htpasswd(username)
+ return Htpasswd(cmk.paths.htpasswd_file).exists(username)
def _user_exists_according_to_profile(username):
@@ -249,14 +250,6 @@ def _user_exists_according_to_profile(username):
or os.path.exists(base_path + "serial.mk")
-def _user_exists_htpasswd(username):
- for line in open(cmk.paths.htpasswd_file):
- l = line.decode("utf-8")
- if l.startswith("%s:" % username):
- return True
- return False
-
-
def user_locked(username):
users = load_users()
return users[username].get('locked', False)
diff --git a/tests/unit/cmk/gui/test_userdb_htpasswd_connector.py
b/tests/unit/cmk/gui/test_userdb_htpasswd_connector.py
new file mode 100644
index 0000000..e4977db
--- /dev/null
+++ b/tests/unit/cmk/gui/test_userdb_htpasswd_connector.py
@@ -0,0 +1,42 @@
+#!/usr/bin/env python
+# encoding: utf-8
+
+import pytest
+import pathlib2 as pathlib
+
+import cmk.gui.plugins.userdb.htpasswd as htpasswd
+
+(a)pytest.fixture()
+def htpasswd_file(tmpdir):
+ htpasswd_file = tmpdir.join("htpasswd")
+ htpasswd_file.write((
+ u"bärnd:NEr3kqi287FQc\n"
+ u"cmkadmin:NEr3kqi287FQc\n"
+ u"locked:!NEr3kqi287FQc\n"
+ ).encode("utf-8"), mode="wb")
+ return pathlib.Path(htpasswd_file)
+
+
+def test_htpasswd_exists(htpasswd_file):
+ assert htpasswd.Htpasswd(htpasswd_file).exists(u"cmkadmin")
+ assert htpasswd.Htpasswd(htpasswd_file).exists(u"locked")
+ assert not htpasswd.Htpasswd(htpasswd_file).exists(u"not-existing")
+ assert not htpasswd.Htpasswd(htpasswd_file).exists(u"")
+ assert htpasswd.Htpasswd(htpasswd_file).exists(u"bärnd")
+
+
+def test_htpasswd_load(htpasswd_file):
+ credentials = htpasswd.Htpasswd(htpasswd_file).load()
+ assert credentials[u"cmkadmin"] == "NEr3kqi287FQc"
+ assert isinstance(credentials[u"cmkadmin"], unicode)
+ assert credentials[u"bärnd"] == "NEr3kqi287FQc"
+
+
+def test_htpasswd_save(htpasswd_file):
+ credentials = htpasswd.Htpasswd(htpasswd_file).load()
+
+ saved_file = htpasswd_file.with_suffix(".saved")
+ htpasswd.Htpasswd(saved_file).save(credentials)
+
+ assert htpasswd_file.open(encoding="utf-8").read() \
+ == saved_file.open(encoding="utf-8").read()