Module: check_mk
Branch: master
Commit: c0f162de2cab90ec10e882578fb90cc269fd32bc
URL:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=c0f162de2cab90…
Author: Mathias Kettner <mk(a)mathias-kettner.de>
Date: Tue Jun 4 11:14:38 2013 +0200
Livestatus proxy: forwarding implemented
---
doc/treasures/liveproxy/liveproxyd | 107 +++++++++++++++++++++++++++++++-----
1 file changed, 92 insertions(+), 15 deletions(-)
diff --git a/doc/treasures/liveproxy/liveproxyd b/doc/treasures/liveproxy/liveproxyd
index f08771e..d1313e4 100755
--- a/doc/treasures/liveproxy/liveproxyd
+++ b/doc/treasures/liveproxy/liveproxyd
@@ -38,6 +38,8 @@ def liveproxyd_run():
readable, writable = do_select()
complete_connections(writable)
accept_new_clients(readable)
+ get_new_requests(readable)
+ distribute_requests()
except MKSignalException, e:
@@ -81,7 +83,7 @@ def complete_connections(writable):
for channel in sitestate["channels"]:
if channel["state"] == "connecting" and
channel["socket"] in writable:
log("Channel %s/%d successfully connected" % (sitename,
channel["socket"].fileno()))
- channel["state"] = "connected"
+ channel["state"] = "ready"
# Master/Mega/Central select(). We are going to be the select() master. Harhar.
def do_select():
@@ -97,10 +99,99 @@ def do_select():
# new client connections
read_fds.append(sitestate["socket"])
+ # new requests from existing clients
+ for client in sitestate["clients"]:
+ if client["state"] == "idle":
+ read_fds.append(client["socket"])
+
r_able, w_able, x_able = select.select(read_fds, write_fds, [], 1.0)
return r_able, w_able
+def accept_new_clients(readable):
+ for sitename, sitestate in g_sites.items():
+ if sitestate["socket"] in readable:
+ print readable
+ try:
+ s, addrinfo = sitestate["socket"].accept()
+ log("Accepted new client %s/%d" % (sitename, s.fileno()))
+ sitestate["clients"].append({"socket" : s,
"state" : "idle", "since" : time.time()})
+ except Exception, e:
+ if opt_debug:
+ raise
+ log("Failed to accept new client for %s: %s" % (sitename, e))
+
+def get_new_requests(readable):
+ for sitename, sitestate in g_sites.items():
+ for client in sitestate["clients"]:
+ if client["state"] == "idle" and \
+ client["socket"] in readable:
+ try:
+ request = receive_request(client["socket"])
+ client["state"] = "wait_for_channel"
+ client["since"] = time.time()
+ client["request"] = request
+ log("Get new request from %s/%d (%d bytes)" %
+ (sitename, client["socket"].fileno(), len(request)))
+ except Exception, e:
+ if opt_debug:
+ raise
+ log("Cannot read request from client %s/%d: %s" %
+ (sitename, client["socket"].fileno(), e))
+
+def distribute_requests():
+ for sitename, sitestate in g_sites.items():
+ waiting_clients = [ client for client in sitestate["clients"] if
client["state"] == "wait_for_channel"]
+ # Sort after waiting time, we should be fair to all...
+ waiting_clients.sort(cmp = lambda a, b: cmp(a["since"],
b["since"]))
+
+ for channel in sitestate["channels"]:
+ if not waiting_clients:
+ break
+ if channel["state"] == "ready":
+ client = waiting_clients[0]
+ del waiting_clients[0]
+ forward_request(sitename, client, channel)
+
+def forward_request(sitename, client, channel):
+ cls = client["socket"]
+ chs = channel["socket"]
+ log("Forwarding request from client %s/%d to channel %s/%d" % (
+ sitename, cls.fileno(), sitename, chs.fileno()))
+ try:
+ chs.send(client["request"])
+ client["state"] = "wait_for_response"
+ channel["state"] = "busy"
+ except Exception, e:
+ if opt_debug:
+ raise
+ log("Error: %s" % e)
+ close_client_with_error(client, str(e))
+
+
+def close_client_with_error(client, message):
+ try:
+ client["socket"].send("400%12d\n%s\n" % (len(message) + 2,
message))
+ except Exception, e:
+ if opt_debug:
+ raise
+ log("Cannot send error message to client %s/%d: %s" % (
+ sitename, client["socket"].fileno(), e))
+
+ client["socket"] = None
+ client["state"] = "closed"
+
+
+
+# TODO: one malicious client can hang the whole proxy. In order
+# to prevent this we'd need partial requests...
+def receive_request(sock):
+ textbody = ""
+ while not textbody.endswith("\n\n"):
+ textbody += sock.recv(65536)
+ return textbody
+
+
def open_client_sockets():
if not os.path.exists(opt_socketdir):
os.makedirs(opt_socketdir)
@@ -125,20 +216,6 @@ def create_unix_socket(sitename):
g_sites[sitename]["socket"] = s
-def accept_new_clients(readable):
- for sitename, sitestate in g_sites.items():
- if sitestate["socket"] in readable:
- print readable
- try:
- s, addrinfo = sitestate["socket"].accept()
- log("Accepted new client %s/%d" % (sitename, s.fileno()))
- sitestate["clients"].append(s)
- except Exception, e:
- if opt_debug:
- raise
- log("Failed to accept new client for %s: %s" % (sitename, e))
-
-
#.
# .--Daemon/main---------------------------------------------------------.
# | ____ __ _ |