Module: check_mk
Branch: master
Commit: ef958a73ac5d92c6b2aa96790b3b3618a9b00d33
URL:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=ef958a73ac5d92…
Author: Mathias Kettner <mk(a)mathias-kettner.de>
Date: Tue Jun 4 15:29:39 2013 +0200
liveproxy: fixed several side cases
---
doc/treasures/liveproxy/liveproxyd | 78 +++++++++++++++++++++---------------
1 file changed, 45 insertions(+), 33 deletions(-)
diff --git a/doc/treasures/liveproxy/liveproxyd b/doc/treasures/liveproxy/liveproxyd
index 19df726..0e746c0 100755
--- a/doc/treasures/liveproxy/liveproxyd
+++ b/doc/treasures/liveproxy/liveproxyd
@@ -21,7 +21,7 @@ sites = {}
# State of all sites
g_sites = {
# "mysite" : {
-# "state" : "starting" / "up"
+# "state" : "starting" / "up" / "busy"
# "socket" : -> UNIX server socket,
# "clients" : [ ... ],
# "channels" : [ ... ],
@@ -84,7 +84,6 @@ def initiate_connections():
if len(channels) < siteconf["channels"]:
if time.time() - sitestate["last_failed_connect"] >=
siteconf["retry"]:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- log("Creating channel %s/%d..." % (sitename, s.fileno()))
s.setblocking(0)
try:
s.connect(siteconf["socket"])
@@ -108,7 +107,7 @@ def initiate_connections():
def do_heartbeats():
now = time.time()
for sitename, sitestate in g_sites.items():
- if sitestate["state"] == "ready":
+ if sitestate["state"] != "starting":
rate, timeout = sites[sitename]["heartbeat"]
channel = sitestate["heartbeat"]["channel"]
since = sitestate["heartbeat"]["since"]
@@ -133,7 +132,7 @@ def send_heartbeat(sitename, sitestate):
while True:
next_channel = None
for channel in sitestate["channels"]:
- if channel["state"] == "ready":
+ if channel["state"] != "starting":
if next_channel == None or channel["since"] <
next_channel["since"]:
min_since = channel["since"]
next_channel = channel
@@ -143,7 +142,7 @@ def send_heartbeat(sitename, sitestate):
else:
channel = next_channel
- log("Sending heartbeat to channel %s/%d" % (sitename,
channel["socket"].fileno()))
+ # log("Sending heartbeat to channel %s/%d" % (sitename,
channel["socket"].fileno()))
try:
channel["socket"].send("GET status\nKeepAlive:
on\nColumns: execute_service_checks\n\n")
channel["state"] = "heartbeat"
@@ -233,9 +232,9 @@ def get_new_requests(readable):
for sitename, sitestate in g_sites.items():
for client in sitestate["clients"]:
if client["state"] == "idle" and \
- client["socket"] in readable:
+ (client["socket"] in readable or
client.get("nextrequest")):
try:
- request = receive_request(sitename, client["socket"])
+ request = receive_request(sitename, client)
if request:
client["state"] = "wait_for_channel"
client["since"] = time.time()
@@ -252,17 +251,24 @@ def get_new_requests(readable):
def distribute_requests():
for sitename, sitestate in g_sites.items():
+ if sitestate["state"] == "busy":
+ continue
waiting_clients = [ client for client in sitestate["clients"] if
client["state"] == "wait_for_channel"]
# Sort after waiting time, we should be fair to all...
waiting_clients.sort(cmp = lambda a, b: cmp(a["since"],
b["since"]))
- for channel in sitestate["channels"]:
- if not waiting_clients:
- break
- if channel["state"] == "ready":
- client = waiting_clients[0]
- del waiting_clients[0]
- forward_request(sitename, client, channel)
+ # one channel must always be kept for heartbeat
+ allowed_channels = len([c for c in sitestate["channels"] if
c["state"] in ["ready", "heartbeat"]])
+ if allowed_channels <= 1:
+ sitestate["state"] = "busy"
+ else:
+ for channel in sitestate["channels"]:
+ if not waiting_clients:
+ break
+ if channel["state"] == "ready":
+ client = waiting_clients[0]
+ del waiting_clients[0]
+ forward_request(sitename, client, channel)
def forward_request(sitename, client, channel):
cls = client["socket"]
@@ -271,10 +277,14 @@ def forward_request(sitename, client, channel):
client["request"], sitename, cls.fileno(), sitename, chs.fileno()))
try:
chs.send(client["request"])
- client["state"] = "wait_for_response"
- channel["state"] = "busy"
- client["channel"] = channel
- channel["client"] = client
+ if not client["request"].startswith("COMMAND"):
+ client["state"] = "wait_for_response"
+ channel["state"] = "busy"
+ client["channel"] = channel
+ channel["client"] = client
+ else:
+ client["request"] = ""
+ client["state"] = "idle"
except Exception, e:
if opt_debug:
raise
@@ -304,20 +314,24 @@ def respond_client_with_error(client, message):
# TODO: one malicious client can hang the whole proxy. In order
# to prevent this we'd need partial requests...
-def receive_request(sitename, sock):
- textbody = ""
- while not textbody.endswith("\n\n"):
+def receive_request(sitename, client):
+ # Note: Multisite can send several requests at once. For example
+ # a command and a wait query (reschedule button)
+ request = client.get("nextrequest", "")
+ while "\n\n" not in request:
try:
- chunk = sock.recv(65536)
- log("CHUNK: %s" % chunk)
+ chunk = client["socket"].recv(65536)
+ request += chunk
except socket.error, e:
if e.errno != 4: # Interrupted system cal
raise
if not chunk:
- log("Client %s/%d closed connection." % (sitename, sock.fileno()))
+ log("Client %s/%d closed connection." % (sitename,
client["socket"].fileno()))
return None
- textbody += chunk
- return textbody
+ end = request.index("\n\n")
+ client["nextrequest"] = request[end+2:]
+ request = request[:end+2]
+ return request
def get_responses(readable):
@@ -332,14 +346,11 @@ def get_responses(readable):
def receive_response(sitename, channel):
- log("HIRN: Hole Antwort...")
client = channel["client"]
# We always assume fixed16 as response header!
old_response = channel.get("response", "")
- log("HIRN: altes test: %s" % old_response)
try:
chunk = channel["socket"].recv(65536)
- log("HIRN: neuer Chunk: %s" % chunk)
if not chunk:
raise Exception("Connect closed by foreign host")
@@ -355,12 +366,10 @@ def receive_response(sitename, channel):
return
if len(response) < 16: # header not yet complete
- log("HIRN: noch nicht fertig, nur [%s]" % response)
return
try:
bodylength = int(response[3:15])
- log("HIRN: koerper ist %d bytes" % bodylength)
except Exception, e:
if opt_debug:
raise
@@ -383,6 +392,7 @@ def receive_response(sitename, channel):
# Response complete
channel["state"] = "ready"
+ g_sites[sitename]["state"] = "ready" # at least one channel free
channel["response"] = ""
del channel["client"]
del client["channel"]
@@ -411,8 +421,8 @@ def receive_heartbeat(sitename, channel):
sitename, channel["socket"].fileno(), chunk))
disconnect_from_site(sitename)
else:
- log("Channel %s/%d: received valid heartbeat" % (
- sitename, channel["socket"].fileno()))
+ # log("Channel %s/%d: received valid heartbeat" % (
+ # sitename, channel["socket"].fileno()))
channel["state"] = "ready"
sitestate["heartbeat"]["since"] = time.time()
sitestate["heartbeat"]["channel"] = None
@@ -644,6 +654,8 @@ def read_configuration():
"last_reset" : time.time(),
"last_failed_connect" : 0,
}
+ if siteconf["channels"] <= 1:
+ raise bail_out("Invalid configuration for site %s: you need at least two
channels" % sitename)
def do_restart():
log("Restarting myself")