Module: check_mk
Branch: master
Commit: ca2fa5522ba5c2a2c0c09e9d5ce2ce636308d336
URL:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=ca2fa5522ba5c2…
Author: Lars Michelsen <lm(a)mathias-kettner.de>
Date: Fri Sep 29 10:27:20 2017 +0200
EC: Cleaned up termination handling (no self kill, clean thread exit)
Change-Id: Ifa0c5dcae322f0ad8774915e41000e787144f75e
---
bin/mkeventd | 114 ++++++++++++++++++++++++++++++++---------------------------
1 file changed, 62 insertions(+), 52 deletions(-)
diff --git a/bin/mkeventd b/bin/mkeventd
index 91c50d0..46e3721 100755
--- a/bin/mkeventd
+++ b/bin/mkeventd
@@ -326,6 +326,48 @@ class ECLock(object):
+class ECServerThread(threading.Thread):
+ def __init__(self, name):
+ super(ECServerThread, self).__init__(name=name)
+ self._terminate_event = threading.Event()
+ self.logger = logger.getChild(name)
+
+
+ def run(self):
+ self.logger.info("Starting up")
+
+ while not self._shal_terminate():
+ try:
+ with cmk.profile.Profile(enabled=opt_profile.get(self.name),
+ profile_file=cmk.paths.omd_root +
"/var/mkeventd/%s.profile" % self.name):
+ self.serve()
+ except Exception:
+ self.logger.exception("Exception in %s server" % self.name)
+ if opt_debug:
+ raise
+ time.sleep(1)
+
+ self.logger.info("Terminated")
+
+
+ def _shal_terminate(self):
+ return self._terminate_event.is_set()
+
+
+ def terminate(self):
+ self._terminate_event.set()
+
+
+ def serve(self):
+ raise NotImplementedError()
+
+
+def terminate():
+ g_terminate_main_event.set()
+ g_status_server.terminate()
+ g_event_server.terminate()
+
+
def bail_out(reason):
logger.error("FATAL ERROR: %s" % reason)
sys.exit(1)
@@ -1274,20 +1316,17 @@ class Perfcounters(object):
# | Verarbeitung und Klassifizierung von eingehenden Events. |
# '----------------------------------------------------------------------'
-class EventServer(threading.Thread):
+class EventServer(ECServerThread):
month_names = { "Jan": 1, "Feb": 2, "Mar": 3,
"Apr": 4, "May": 5, "Jun": 6,
"Jul": 7, "Aug": 8, "Sep": 9,
"Oct": 10, "Nov": 11, "Dec": 12, }
def __init__(self):
super(EventServer, self).__init__(name="EventServer")
- # TODO: Change from daemon mode to explicit and clean termination
- self.setDaemon(True)
self._syslog = None
self._syslog_tcp = None
self._snmptrap = None
self._mib_resolver = None
- self.logger = logger.getChild("EventServer")
self.create_pipe()
self.open_eventsocket()
@@ -1474,27 +1513,6 @@ class EventServer(threading.Thread):
self._eventsocket = None
- def run(self):
- while not self._should_terminate:
- with cmk.profile.profile(enabled=opt_profile.get("event"),
- profile_file=cmk.paths.omd_root +
"/var/mkeventd/event.profile"):
- self.run_loop()
-
-
- def run_loop(self):
- c = 0
- while True:
- c += 1
- try:
- self.serve()
-
- except Exception:
- self.logger.exception("Exception in event server:\n%s" % e)
- if opt_debug:
- raise
- time.sleep(1)
-
-
def open_pipe(self):
# Beware: we must open the pipe also for writing. Otherwise
# we will see EOF forever after one writer has finished and
@@ -1698,7 +1716,7 @@ class EventServer(threading.Thread):
# fd to (fileobject, data)
client_sockets = {}
select_timeout = 1
- while True:
+ while not self._shal_terminate():
try:
readable = select.select(listen_list + client_sockets.keys(), [], [],
select_timeout)[0]
except select.error as e:
@@ -3105,16 +3123,12 @@ class EventServer(threading.Thread):
# | Beantworten von Status- und Kommandoanfragen über das UNIX-Socket |
# '----------------------------------------------------------------------'
-class StatusServer(threading.Thread):
+class StatusServer(ECServerThread):
def __init__(self):
super(StatusServer, self).__init__(name="StatusServer")
- # TODO: Change from daemon mode to explicit and clean termination
- self.setDaemon(True)
self._socket = None
self._tcp_socket = None
self._reopen_sockets = False
- self._should_terminate = False
- self.logger = logger.getChild("StatusServer")
def open_sockets(self):
@@ -3179,24 +3193,8 @@ class StatusServer(threading.Thread):
self._reopen_sockets = True
- # TODO: Wenn das hier komplexer wird, machen wir eine
- # Klassenvererbung
- def run(self):
- while not self._should_terminate:
- try:
- with cmk.profile.profile(enabled=opt_profile.get("status"),
- profile_file=cmk.paths.omd_root +
"/var/mkeventd/status.profile"):
- self.serve()
- except Exception:
- self.logger.exception("Exception in status server")
- if opt_debug:
- raise
- time.sleep(1)
- self.logger.info("Killing myself with signal 15")
- os.kill(os.getpid(), 15)
-
def serve(self):
- while not self._should_terminate:
+ while not self._shal_terminate():
try:
client_socket = None
addr_info = None
@@ -3330,7 +3328,7 @@ class StatusServer(threading.Thread):
self.handle_command_reload()
elif command == "SHUTDOWN":
self.logger.info("Going to shut down")
- self._should_terminate = True
+ terminate()
elif command == "REOPENLOG":
self.handle_command_reopenlog()
elif command == "FLUSH":
@@ -3602,7 +3600,7 @@ def run_eventd():
next_statistics = now + g_config["statistics_interval"]
next_replication = 0 # force immediate replication after restart
- while True:
+ while not g_terminate_main_event.is_set():
try:
# Wait until either housekeeping or retention is due, but at
# maximum 60 seconds. That way changes of the interval from a very
@@ -3640,7 +3638,7 @@ def run_eventd():
logger.info("Received SIGHUP - going to reload configuration")
else:
logger.info("Signalled to death by signal %d" % e._signum)
- break
+ terminate()
except Exception as e:
logger.exception("Exception in main thread:\n%s" % e)
@@ -3648,6 +3646,10 @@ def run_eventd():
raise
time.sleep(1)
+ # Now wait for termination of the server threads
+ g_event_server.join()
+ g_status_server.join()
+
#.
# .--EventStatus---------------------------------------------------------.
@@ -4825,6 +4827,8 @@ lock_eventstatus = ECLock("eventstatus")
lock_configuration = ECLock("configuration")
lock_logging = ECLock("history")
+g_terminate_main_event = threading.Event()
+
if __name__ == "__main__":
os.unsetenv("LANG")
@@ -4857,7 +4861,11 @@ if __name__ == "__main__":
elif o == '--snmptrap-fd':
opt_snmptrap_fd = int(a)
elif o.startswith('--profile-'):
- opt_profile[o[10:]] = True
+ what = o.split("-")[-1]
+ if what == "status":
+ opt_profile["StatusServer"] = True
+ elif what == "event":
+ opt_profile["EventServer"] = True
cmk.log.open_log(sys.stderr)
cmk.log.set_verbosity(opt_verbose)
@@ -4946,6 +4954,8 @@ if __name__ == "__main__":
# We reach this point, if the server has been killed by
# a signal or hitting Ctrl-C (in foreground mode)
+ # TODO: Move this cleanup stuff to the classes that are responsible for these
ressources
+
# Remove event pipe and drain it, so that we make sure
# that processes (syslog, etc) will not hang when trying
# to write into the pipe.