Module: check_mk
Branch: master
Commit: 66a3f4148d9e74842323e300d7c3d5282c321873
URL:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=66a3f4148d9e74…
Author: Mathias Kettner <mk(a)mathias-kettner.de>
Date: Tue Jun 4 11:41:25 2013 +0200
liveproxyd: first successful forwarding
---
doc/treasures/liveproxy/liveproxyd | 130 +++++++++++++++++++++++++++++++-----
1 file changed, 114 insertions(+), 16 deletions(-)
diff --git a/doc/treasures/liveproxy/liveproxyd b/doc/treasures/liveproxy/liveproxyd
index d1313e4..3cb14ce 100755
--- a/doc/treasures/liveproxy/liveproxyd
+++ b/doc/treasures/liveproxy/liveproxyd
@@ -32,24 +32,26 @@ g_sites = {
def liveproxyd_run():
open_client_sockets()
- try:
- while True:
+ while True:
+ try:
initiate_connections()
readable, writable = do_select()
complete_connections(writable)
accept_new_clients(readable)
get_new_requests(readable)
distribute_requests()
-
-
- except MKSignalException, e:
- log("Got signal %d." % e._signum)
- if e._signum in [ 2, 3, 15 ]:
- sys.exit(0)
- elif e._signum == 10:
- do_restart()
- elif e._signum == 1:
- do_reload()
+ get_responses(readable)
+
+ except MKSignalException, e:
+ log("Got signal %d." % e._signum)
+ if e._signum in [ 2, 3, 15 ]:
+ sys.exit(0)
+ elif e._signum == 10:
+ do_restart()
+ elif e._signum == 12:
+ dump_state()
+ elif e._signum == 1:
+ do_reload()
def initiate_connections():
# Create new channels to target sites. Nonblocking!
@@ -104,6 +106,11 @@ def do_select():
if client["state"] == "idle":
read_fds.append(client["socket"])
+ # Responses from channels
+ for channel in sitestate["channels"]:
+ if channel["state"] == "busy":
+ read_fds.append(channel["socket"])
+
r_able, w_able, x_able = select.select(read_fds, write_fds, [], 1.0)
return r_able, w_able
@@ -162,14 +169,16 @@ def forward_request(sitename, client, channel):
chs.send(client["request"])
client["state"] = "wait_for_response"
channel["state"] = "busy"
+ client["channel"] = channel
+ channel["client"] = client
except Exception, e:
if opt_debug:
raise
log("Error: %s" % e)
- close_client_with_error(client, str(e))
+ respond_client_with_error(client, str(e))
-def close_client_with_error(client, message):
+def respond_client_with_error(client, message):
try:
client["socket"].send("400%12d\n%s\n" % (len(message) + 2,
message))
except Exception, e:
@@ -177,9 +186,15 @@ def close_client_with_error(client, message):
raise
log("Cannot send error message to client %s/%d: %s" % (
sitename, client["socket"].fileno(), e))
+ client["state"] = "closed"
+ client["socket"] = None
+
+ if "channel" in client:
+ channel = client["channel"]
+ del channel["client"]
+ del client["channel"]
- client["socket"] = None
- client["state"] = "closed"
+ client["state"] = "idle"
@@ -192,6 +207,72 @@ def receive_request(sock):
return textbody
+def get_responses(readable):
+ for sitename, sitestate in g_sites.items():
+ for channel in sitestate["channels"]:
+ if channel["state"] == "busy" and
channel["socket"] in readable:
+ receive_response(sitename, channel)
+
+
+def receive_response(sitename, channel):
+ client = channel["client"]
+ # We always assume fixed16 as response header!
+ old_response = channel.get("response", "")
+ try:
+ chunk = channel["socket"].recv(65536)
+ response = old_response + chunk
+ channel["response"] = response
+ except Exception, e:
+ if opt_debug:
+ raise
+ log("Cannot read response from %s/%d: %s" %
+ (sitename, channel["socket"].fileno(), e))
+ channel["state"] = "error"
+ channel["socket"] = None
+ respond_client_with_error(client, "Error receiving response from target site
%s: %s" %
+ (sitenam, e))
+
+ if len(response) < 16: # header not yet complete
+ return
+
+ try:
+ bodylength = int(response[3:15])
+ except Exception, e:
+ if opt_debug:
+ raise
+ log("Malformed response header from cannel %s/%d" % sitename,
channel["socket"].fileno())
+ respond_client_with_error(client, "Malformed response from site %s" %
sitename)
+ channel["state"] = error
+ channel["socket"] = None
+
+ if len(response) > bodylength + 16:
+ log("Too large response on channel %s/%d (%d exceeding bytes)" % (
+ sitename, channel["socket"].fileno(), (len(response) - bodylength -
16)))
+ respond_client_with_error(client, "To long response from site %s" %
sitename)
+ channel["state"] = error
+ channel["socket"] = None
+
+ elif len(response) < bodylength + 16:
+ return
+
+ # Response complete
+ channel["state"] = "ready"
+ del channel["client"]
+ del client["channel"]
+
+ try:
+ client["socket"].send(response)
+ client["state"] = "idle"
+ except Exception, e:
+ if opt_debug:
+ raise
+ log("Cannot forward response from client %s/%d to client %s/%d: %s",
+ (sitename, channel["socket"].fileno(), sitename,
client["socket"].fileno(), e))
+ client["state"] = "error"
+ client["socket"] = None
+
+
+
def open_client_sockets():
if not os.path.exists(opt_socketdir):
os.makedirs(opt_socketdir)
@@ -216,6 +297,22 @@ def create_unix_socket(sitename):
g_sites[sitename]["socket"] = s
+def dump_state():
+ log("Current connection state:")
+ for sitename, sitestate in g_sites.items():
+ log("[%s]" % sitename)
+ log(" Channels:")
+ for channel in sitestate["channels"]:
+ log(" %s/%d - %s, client: %s" %
+ (sitename, channel["socket"].fileno(),
channel["state"], channel.get("client")))
+ log(" Clients:")
+ for client in sitestate["clients"]:
+ log(" %s/%d - %s, channel: %s" %
+ (sitename, client["socket"].fileno(),
client["state"], client.get("channel")))
+
+
+
+
#.
# .--Daemon/main---------------------------------------------------------.
# | ____ __ _ |
@@ -429,6 +526,7 @@ try:
signal.signal(3, signal_handler) # QUIT
signal.signal(15, signal_handler) # TERM
signal.signal(10, signal_handler) # USR1
+ signal.signal(12, signal_handler) # USR2
# Now let's go...
liveproxyd_run()