Module: check_mk
Branch: master
Commit: 24aa11eb5401dce48a12e28f9cd5b70b41b3fd94
URL:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=24aa11eb5401dc…
Author: Mathias Kettner <mk(a)mathias-kettner.de>
Date: Wed Jun 5 11:26:53 2013 +0200
Livestatus proxy seems to work
---
doc/treasures/liveproxy/liveproxyd | 60 +++++++++++++++++++++---------------
1 file changed, 35 insertions(+), 25 deletions(-)
diff --git a/doc/treasures/liveproxy/liveproxyd b/doc/treasures/liveproxy/liveproxyd
index 4b2ac99..431debe 100755
--- a/doc/treasures/liveproxy/liveproxyd
+++ b/doc/treasures/liveproxy/liveproxyd
@@ -126,7 +126,7 @@ def initiate_connections():
def do_heartbeats():
now = time.time()
for sitename, sitestate in g_sites.items():
- if sitestate["state"] != "starting":
+ if sitestate["channels"]:
rate, timeout = sites[sitename]["heartbeat"]
channel = sitestate["heartbeat"]["channel"]
since = sitestate["heartbeat"]["since"]
@@ -170,7 +170,7 @@ def send_heartbeat(sitename, sitestate):
sitestate["heartbeat"]["channel"] = channel
return True
except Exception, e:
- log("Cannot send heartbeat to channel %s/%d: %s" % (sitename,
channel["socket"].filelo(), e))
+ log("Cannot send heartbeat to channel %s/%d: %s" % (sitename,
channel["socket"].fileno(), e))
channel["state"] = "error"
@@ -197,7 +197,6 @@ def complete_connections(writable):
channel["socket"].setblocking(1) # avoid signals from
interrupting us
log("Channel %s/%d successfully connected" % (sitename,
channel["socket"].fileno()))
channel["state"] = "ready"
- sitestate["state"] = "ready" # at least one
connection is up
except Exception, e:
log("Failed to connect channel %s/%d: %s" % (sitename,
channel["socket"].fileno(), e))
channel["state"] = "error"
@@ -254,20 +253,24 @@ def get_new_requests(readable):
(client["socket"] in readable or
client.get("nextrequest")):
try:
request = receive_request(sitename, client)
- if request and not request.startswith("GET") and not
request.startswith("COMMAND"):
- log("Invalid request [%s] from client %s/%d" % (
- request.replace("\n", "\\n"), sitename,
client["socket"].fileno()))
+ if not request:
client["state"] = "closed"
- elif request and not request.startswith("COMMAND") and
"ResponseHeader: fixed16\n" not in request:
- respond_client_with_error(client, "Invalid request, you must
specify ResponseHeader: fixed16.")
- elif request:
- client["state"] = "wait_for_channel"
- client["since"] = time.time()
- client["request"] = request
- # log("Get new request from %s/%d (%d bytes)" %
- # (sitename, client["socket"].fileno(),
len(request)))
else:
- client["state"] = "closed"
+ if sitestate["state"] == "starting":
+ respond_client_with_error(client, "Site is currently not
reachable.")
+ elif not request.startswith("GET") and not
request.startswith("COMMAND"):
+ log("Invalid request [%s] from client %s/%d" % (
+ request.replace("\n", "\\n"),
sitename, client["socket"].fileno()))
+ client["state"] = "closed"
+ elif not request.startswith("COMMAND") and
"ResponseHeader: fixed16\n" not in request:
+ respond_client_with_error(client,
+ "Invalid request, you must specify ResponseHeader:
fixed16.")
+ else:
+ client["state"] = "wait_for_channel"
+ client["since"] = time.time()
+ client["request"] = request
+ # log("Get new request from %s/%d (%d bytes)" %
+ # (sitename, client["socket"].fileno(),
len(request)))
except Exception, e:
if opt_debug:
raise
@@ -276,7 +279,7 @@ def get_new_requests(readable):
def distribute_requests():
for sitename, sitestate in g_sites.items():
- if sitestate["state"] == "busy":
+ if sitestate["state"] != "ready":
continue
waiting_clients = [ client for client in sitestate["clients"] if
client["state"] == "wait_for_channel"]
@@ -328,11 +331,13 @@ def respond_client_with_error(client, message):
except socket.error, e:
if e.errno == 4:
continue # Interrupted system call
+ else:
+ raise
except Exception, e:
if opt_debug:
raise
- log("Cannot send error message to client %s/%d: %s" % (
- sitename, client["socket"].fileno(), e))
+ log("Cannot send error message to client %d: %s" % (
+ client["socket"].fileno(), e))
client["state"] = "closed"
if client.get("channel"):
@@ -474,8 +479,10 @@ def receive_heartbeat(sitename, channel):
sitename, channel["socket"].fileno(), chunk))
disconnect_from_site(sitename)
else:
- # log("Channel %s/%d: received valid heartbeat" % (
- # sitename, channel["socket"].fileno()))
+ # Set site to ready if at least one other channel exists
+ if len(sitestate["channels"]) > 1:
+ sitestate["state"] = "ready"
+ sitestate["state"] = "ready" # at least one connection is
up
channel["state"] = "ready"
sitestate["heartbeat"]["since"] = time.time()
sitestate["heartbeat"]["channel"] = None
@@ -542,7 +549,7 @@ def garbage_collect_sockets():
if channel["state"] not in [ "error", "closed"
]]
def format_time(t):
- return "%s (%d secs ago)" % (
+ return "%s (%3d secs ago)" % (
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(t)), time.time() -
t)
@@ -559,14 +566,16 @@ def dump_state():
sf.write(" Last failed connect: %s\n" %
format_time(sitestate["last_failed_connect"]))
sf.write(" Channels:\n")
for channel in sitestate["channels"]:
- sf.write(" %d - %s, client: %s\n" %
+ sf.write(" %3d - %-18s- client: %4s - since: %s\n" %
(channel["socket"].fileno(), channel["state"],
- channel.get("client") and
channel["client"]["socket"].fileno() or "none"))
+ channel.get("client") and
channel["client"]["socket"].fileno() or "none",
+ format_time(channel["since"])))
sf.write(" Clients:\n")
for client in sitestate["clients"]:
- sf.write(" %d - %s, channel: %s\n" %
+ sf.write(" %3d - %-18s- channel: %4s - since: %s\n" %
(client["socket"].fileno(), client["state"],
- client.get("channel") and
client["channel"]["socket"].fileno() or "none"))
+ client.get("channel") and
client["channel"]["socket"].fileno() or "none",
+ format_time(client["since"])))
sf.write(" Heartbeat:\n")
hb = sitestate["heartbeat"]
@@ -814,6 +823,7 @@ try:
signal.signal(15, signal_handler) # TERM
signal.signal(10, signal_handler) # USR1
signal.signal(12, signal_handler) # USR2
+ signal.signal(13, signal.SIG_IGN) # PIPE
# Now let's go...
liveproxyd_run()