Module: check_mk
Branch: master
Commit: a75f4f85f9a14f4ea32886fc05097447688f196b
URL:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=a75f4f85f9a14f…
Author: Mathias Kettner <mk(a)mathias-kettner.de>
Date: Wed Oct 9 22:36:50 2013 +0200
FIX: liveproxyd: fix handing proxy for larger responses on RH6.4
---
ChangeLog | 1 +
doc/treasures/liveproxy/liveproxyd | 59 +++++++++++++++++++++++++-----------
2 files changed, 43 insertions(+), 17 deletions(-)
diff --git a/ChangeLog b/ChangeLog
index 267e43f..2221ff0 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -35,6 +35,7 @@
Livestatus-Proxy:
* FIX: fix exception when printing error message
* FIX: honor wait time (now called cooling period) after failed TCP connection
+ * FIX: fix hanging if client cannot accept large chunks (seen on RH6.4)
WATO:
* Rule "State and count of processes": New configuration options:
diff --git a/doc/treasures/liveproxy/liveproxyd b/doc/treasures/liveproxy/liveproxyd
index def77b3..a5ff33b 100755
--- a/doc/treasures/liveproxy/liveproxyd
+++ b/doc/treasures/liveproxy/liveproxyd
@@ -81,6 +81,7 @@ def liveproxyd_run():
get_new_requests(readable)
distribute_requests()
get_responses(readable) # also heartbeats
+ send_responses(writable)
handle_client_timeouts()
garbage_collect_sockets()
@@ -219,11 +220,15 @@ def do_select(timeout):
# new client connections
read_fds.append(sitestate["socket"])
- # new requests from existing clients
for client in sitestate["clients"]:
+ # new requests from existing clients
if client["state"] == "idle":
read_fds.append(client["socket"])
+ # clients ready to receive a response
+ if client["state"] == "response":
+ write_fds.append(client["socket"])
+
# Responses from channels, also heartbeat responses
for channel in sitestate["channels"]:
if channel["state"] in [ "busy", "heartbeat"
]:
@@ -242,7 +247,7 @@ def accept_new_clients(readable):
try:
s, addrinfo = sitestate["socket"].accept()
s.setblocking(1)
- log("Accepted new client %s/%d" % (sitename, s.fileno()))
+ # log("Accepted new client %s/%d" % (sitename, s.fileno()))
sitestate["clients"].append({"socket" : s,
"state" : "idle", "since" : time.time()})
except Exception, e:
if opt_debug:
@@ -367,7 +372,7 @@ def receive_request(sitename, client):
raise
if not chunk:
- log("Client %s/%d closed connection." % (sitename,
client["socket"].fileno()))
+ # log("Client %s/%d closed connection." % (sitename,
client["socket"].fileno()))
return None
end = request.index("\n\n")
client["nextrequest"] = request[end+2:]
@@ -445,21 +450,41 @@ def receive_response(sitename, channel):
if client:
del channel["client"]
del client["channel"]
+ client["response"] = response
+ client["response_offset"] = 0
+ client["state"] = "response"
- try:
- # ACHTUNG: Beim senden an den Client können wir blockieren, wenn
- # der Empfänger uns ausbremst. Dürfen wir aber nicht. Wir brauchen
- # eine Queue, müssen mit select() warten, usw.
- client["socket"].send(response)
- # log("HIRN: An client: [%s]" %
response[16:80].replace("\n", ""))
- client["state"] = "idle"
- client["request"] = ""
- except Exception, e:
- if opt_debug:
- raise
- log("Cannot forward response from client %s/%d to client %s/%d:
%s",
- (sitename, channel["socket"].fileno(), sitename,
client["socket"].fileno(), e))
- client["state"] = "error"
+def send_responses(writable):
+ for sitename, sitestatus in g_sites.items():
+ for client in sitestatus["clients"]:
+ if client["state"] == "response" and
client["socket"] in writable:
+
+ try:
+ # ACHTUNG: Beim senden an den Client können wir blockieren, wenn
+ # der Empfänger uns ausbremst. Dürfen wir aber nicht. Wir brauchen
+ # eine Queue, müssen mit select() warten, usw.
+ offset = client["response_offset"]
+ chunk = client["response"][offset:offset + 8192]
+ client["socket"].setblocking(0) # TEST TEST TEST
+ bytes_sent = client["socket"].send(chunk)
+ client["socket"].setblocking(1)
+ if bytes_sent <= 0:
+ raise Exception("Could not send any bytes of response to
client")
+ if offset + bytes_sent == len(client["response"]):
+ client["state"] = "idle"
+ del client["response"]
+ del client["response_offset"]
+ else:
+ client["response_offset"] += bytes_sent
+
+ except Exception, e:
+ if opt_debug:
+ raise
+ log("Cannot forward next %d bytes of response to client %s/%d:
%s" %
+ (len(chunk), sitename, client["socket"].fileno(), e))
+ client["state"] = "error"
+ del client["response"]
+ del client["response_offset"]
def receive_heartbeat(sitename, channel):
sitestate = g_sites[sitename]