Module: check_mk
Branch: master
Commit: d17e77b8fd0d5b8218249a095343262c2244034d
URL:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=d17e77b8fd0d5b…
Author: Mathias Kettner <mk(a)mathias-kettner.de>
Date: Tue Jun 4 12:03:12 2013 +0200
liveproxyd: first working multisite
---
doc/treasures/liveproxy/liveproxyd | 43 +++++++++++++++++++++---------------
1 file changed, 25 insertions(+), 18 deletions(-)
diff --git a/doc/treasures/liveproxy/liveproxyd b/doc/treasures/liveproxy/liveproxyd
index 3a68359..0c2a371 100755
--- a/doc/treasures/liveproxy/liveproxyd
+++ b/doc/treasures/liveproxy/liveproxyd
@@ -34,14 +34,15 @@ def liveproxyd_run():
while True:
try:
+ dump_state()
initiate_connections()
- readable, writable = do_select()
+ readable, writable = do_select(5.0)
complete_connections(writable)
accept_new_clients(readable)
get_new_requests(readable)
distribute_requests()
get_responses(readable)
- # garbage_collect_sockets()
+ garbage_collect_sockets()
except MKSignalException, e:
log("Got signal %d." % e._signum)
@@ -89,7 +90,7 @@ def complete_connections(writable):
channel["state"] = "ready"
# Master/Mega/Central select(). We are going to be the select() master. Harhar.
-def do_select():
+def do_select(timeout):
read_fds = []
write_fds = []
@@ -112,7 +113,7 @@ def do_select():
if channel["state"] == "busy":
read_fds.append(channel["socket"])
- r_able, w_able, x_able = select.select(read_fds, write_fds, [], 1.0)
+ r_able, w_able, x_able = select.select(read_fds, write_fds, [], timeout)
return r_able, w_able
@@ -143,7 +144,6 @@ def get_new_requests(readable):
(sitename, client["socket"].fileno(),
len(request)))
else:
client["state"] = "closed"
- client["socket"] = None
except Exception, e:
if opt_debug:
raise
@@ -191,7 +191,6 @@ def respond_client_with_error(client, message):
log("Cannot send error message to client %s/%d: %s" % (
sitename, client["socket"].fileno(), e))
client["state"] = "closed"
- client["socket"] = None
if "channel" in client:
channel = client["channel"]
@@ -236,7 +235,6 @@ def receive_response(sitename, channel):
log("Cannot read response from %s/%d: %s" %
(sitename, channel["socket"].fileno(), e))
channel["state"] = "error"
- channel["socket"] = None
respond_client_with_error(client, "Error receiving response from target site
%s: %s" %
(sitenam, e))
@@ -250,21 +248,23 @@ def receive_response(sitename, channel):
raise
log("Malformed response header from cannel %s/%d" % sitename,
channel["socket"].fileno())
respond_client_with_error(client, "Malformed response from site %s" %
sitename)
- channel["state"] = error
- channel["socket"] = None
+ channel["state"] = "error"
+ channel["response"] = ""
if len(response) > bodylength + 16:
- log("Too large response on channel %s/%d (%d exceeding bytes)" % (
- sitename, channel["socket"].fileno(), (len(response) - bodylength -
16)))
+ log("Too large response on channel %s/%d (%d exceeding bytes: [%s])" %
(
+ sitename, channel["socket"].fileno(), len(response) - bodylength -
16, response[bodylength + 16:]))
respond_client_with_error(client, "To long response from site %s" %
sitename)
- channel["state"] = error
- channel["socket"] = None
+ channel["state"] = "error"
+ channel["response"] = ""
+
elif len(response) < bodylength + 16:
return
# Response complete
channel["state"] = "ready"
+ channel["response"] = ""
del channel["client"]
del client["channel"]
@@ -277,7 +277,6 @@ def receive_response(sitename, channel):
log("Cannot forward response from client %s/%d to client %s/%d: %s",
(sitename, channel["socket"].fileno(), sitename,
client["socket"].fileno(), e))
client["state"] = "error"
- client["socket"] = None
@@ -304,6 +303,14 @@ def create_unix_socket(sitename):
g_sites[sitename]["socket"] = s
+def garbage_collect_sockets():
+ for sitename, sitestate in g_sites.items():
+ sitestate["channels"] = [
+ channel for channel in sitestate["channels"]
+ if channel["state"] not in [ "error", "closed"
]]
+ sitestate["clients"] = [
+ channel for channel in sitestate["clients"]
+ if channel["state"] not in [ "error", "closed"
]]
def dump_state():
log("Current connection state:")
@@ -311,12 +318,12 @@ def dump_state():
log("[%s]" % sitename)
log(" Channels:")
for channel in sitestate["channels"]:
- log(" %s/%d - %s, client: %s" %
- (sitename, channel["socket"].fileno(),
channel["state"], channel.get("client") and
channel["client"]["socket"].fileno() or "none"))
+ log(" %d - %s, client: %s" %
+ (channel["socket"].fileno(), channel["state"],
channel.get("client") and
channel["client"]["socket"].fileno() or "none"))
log(" Clients:")
for client in sitestate["clients"]:
- log(" %s/%d - %s, channel: %s" %
- (sitename, client["socket"].fileno(),
client["state"], client.get("channel") and
client["channel"]["socket"].fileno() or "none"))
+ log(" %d - %s, channel: %s" %
+ (client["socket"].fileno(), client["state"],
client.get("channel") and
client["channel"]["socket"].fileno() or "none"))