lists.checkmk.com
Sign In
Sign Up
Sign In
Sign Up
Manage this list
×
Keyboard Shortcuts
Thread View
j
: Next unread message
k
: Previous unread message
j a
: Jump to all threads
j l
: Jump to MailingList overview
2024
June
May
April
March
February
January
2023
December
November
October
September
August
July
June
May
April
March
February
January
2022
December
November
October
September
August
July
June
May
April
March
February
January
2021
December
November
October
September
August
July
June
May
April
March
February
January
2020
December
November
October
September
August
July
June
May
April
March
February
January
2019
December
November
October
September
August
July
June
May
April
March
February
January
2018
December
November
October
September
August
July
June
May
April
March
February
January
2017
December
November
October
September
August
July
June
May
April
March
February
January
2016
December
November
October
September
August
July
June
May
April
March
February
January
2015
December
November
October
September
August
July
June
May
April
March
February
January
2014
December
November
October
September
August
July
June
May
April
March
February
January
2013
December
November
October
September
August
July
June
May
April
March
February
January
2012
December
November
October
September
August
July
June
May
April
March
February
January
2011
December
November
October
September
August
July
June
May
April
March
February
January
2010
December
November
October
List overview
Download
Checkmk git commits
June 2013
----- 2024 -----
June 2024
May 2024
April 2024
March 2024
February 2024
January 2024
----- 2023 -----
December 2023
November 2023
October 2023
September 2023
August 2023
July 2023
June 2023
May 2023
April 2023
March 2023
February 2023
January 2023
----- 2022 -----
December 2022
November 2022
October 2022
September 2022
August 2022
July 2022
June 2022
May 2022
April 2022
March 2022
February 2022
January 2022
----- 2021 -----
December 2021
November 2021
October 2021
September 2021
August 2021
July 2021
June 2021
May 2021
April 2021
March 2021
February 2021
January 2021
----- 2020 -----
December 2020
November 2020
October 2020
September 2020
August 2020
July 2020
June 2020
May 2020
April 2020
March 2020
February 2020
January 2020
----- 2019 -----
December 2019
November 2019
October 2019
September 2019
August 2019
July 2019
June 2019
May 2019
April 2019
March 2019
February 2019
January 2019
----- 2018 -----
December 2018
November 2018
October 2018
September 2018
August 2018
July 2018
June 2018
May 2018
April 2018
March 2018
February 2018
January 2018
----- 2017 -----
December 2017
November 2017
October 2017
September 2017
August 2017
July 2017
June 2017
May 2017
April 2017
March 2017
February 2017
January 2017
----- 2016 -----
December 2016
November 2016
October 2016
September 2016
August 2016
July 2016
June 2016
May 2016
April 2016
March 2016
February 2016
January 2016
----- 2015 -----
December 2015
November 2015
October 2015
September 2015
August 2015
July 2015
June 2015
May 2015
April 2015
March 2015
February 2015
January 2015
----- 2014 -----
December 2014
November 2014
October 2014
September 2014
August 2014
July 2014
June 2014
May 2014
April 2014
March 2014
February 2014
January 2014
----- 2013 -----
December 2013
November 2013
October 2013
September 2013
August 2013
July 2013
June 2013
May 2013
April 2013
March 2013
February 2013
January 2013
----- 2012 -----
December 2012
November 2012
October 2012
September 2012
August 2012
July 2012
June 2012
May 2012
April 2012
March 2012
February 2012
January 2012
----- 2011 -----
December 2011
November 2011
October 2011
September 2011
August 2011
July 2011
June 2011
May 2011
April 2011
March 2011
February 2011
January 2011
----- 2010 -----
December 2010
November 2010
October 2010
checkmk-commits@lists.checkmk.com
4 participants
186 discussions
Start a n
N
ew thread
Check_MK Git: check_mk: cisco_temp_perf: convert to new check API
by Mathias Kettner
Module: check_mk Branch: master Commit: 8d4d51dbfffe7aab48c3cc2fc9a20e749aad9581 URL:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=8d4d51dbfffe7a…
Author: Mathias Kettner <mk(a)mathias-kettner.de> Date: Wed Jun 5 11:50:19 2013 +0200 cisco_temp_perf: convert to new check API --- checks/cisco_temp_perf | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/checks/cisco_temp_perf b/checks/cisco_temp_perf index eea93ee..baa39ee 100644 --- a/checks/cisco_temp_perf +++ b/checks/cisco_temp_perf @@ -87,10 +87,17 @@ def check_cisco_temp_perf(item, _no_params, info): return (2, "state is %s%s" % (statename, temptext), perfdata) return (3, "Item %s not found in SNMP data" % item) -check_info['cisco_temp_perf'] = (check_cisco_temp_perf, "Temperature %s", 1, inventory_cisco_temp_perf) -snmp_info['cisco_temp_perf'] = ( ".1.3.6.1.4.1.9.9.13.1.3.1", [ 2, 3, 4, 6, OID_END ] ) # CISCO-SMI -snmp_scan_functions['cisco_temp_perf'] = \ - lambda oid: "cisco" in oid(".1.3.6.1.2.1.1.1.0").lower() and \ - oid(".1.3.6.1.4.1.9.9.13.1.3.1.3.*") != None and \ - oid(".1.3.6.1.4.1.9.9.91.1.1.1.1.*") == None, -checkgroup_of["cisco_temp_perf"] = "temperature_auto" + +check_info['cisco_temp_perf'] = { + "check_function" : check_cisco_temp_perf, + "inventory_function" : inventory_cisco_temp_perf, + "service_description" : "Temperature %s", + "has_perfdata" : True, + "snmp_scan_function" : + lambda oid: "cisco" in oid(".1.3.6.1.2.1.1.1.0").lower() and \ + oid(".1.3.6.1.4.1.9.9.13.1.3.1.3.*") != None and \ + oid(".1.3.6.1.4.1.9.9.91.1.1.1.1.*") == None, + "snmp_info" : ( ".1.3.6.1.4.1.9.9.13.1.3.1", + [2, 3, 4, 6, OID_END ] ), # CISCO-SMI + "group" : "temperature_auto" +}
11 years
1
0
0
0
Check_MK Git: check_mk: Livestatus proxy seems to work
by Mathias Kettner
Module: check_mk Branch: master Commit: 24aa11eb5401dce48a12e28f9cd5b70b41b3fd94 URL:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=24aa11eb5401dc…
Author: Mathias Kettner <mk(a)mathias-kettner.de> Date: Wed Jun 5 11:26:53 2013 +0200 Livestatus proxy seems to work --- doc/treasures/liveproxy/liveproxyd | 60 +++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/doc/treasures/liveproxy/liveproxyd b/doc/treasures/liveproxy/liveproxyd index 4b2ac99..431debe 100755 --- a/doc/treasures/liveproxy/liveproxyd +++ b/doc/treasures/liveproxy/liveproxyd @@ -126,7 +126,7 @@ def initiate_connections(): def do_heartbeats(): now = time.time() for sitename, sitestate in g_sites.items(): - if sitestate["state"] != "starting": + if sitestate["channels"]: rate, timeout = sites[sitename]["heartbeat"] channel = sitestate["heartbeat"]["channel"] since = sitestate["heartbeat"]["since"] @@ -170,7 +170,7 @@ def send_heartbeat(sitename, sitestate): sitestate["heartbeat"]["channel"] = channel return True except Exception, e: - log("Cannot send heartbeat to channel %s/%d: %s" % (sitename, channel["socket"].filelo(), e)) + log("Cannot send heartbeat to channel %s/%d: %s" % (sitename, channel["socket"].fileno(), e)) channel["state"] = "error" @@ -197,7 +197,6 @@ def complete_connections(writable): channel["socket"].setblocking(1) # avoid signals from interrupting us log("Channel %s/%d successfully connected" % (sitename, channel["socket"].fileno())) channel["state"] = "ready" - sitestate["state"] = "ready" # at least one connection is up except Exception, e: log("Failed to connect channel %s/%d: %s" % (sitename, channel["socket"].fileno(), e)) channel["state"] = "error" @@ -254,20 +253,24 @@ def get_new_requests(readable): (client["socket"] in readable or client.get("nextrequest")): try: request = receive_request(sitename, client) - if request and not request.startswith("GET") and not request.startswith("COMMAND"): - log("Invalid request [%s] from client %s/%d" % ( - request.replace("\n", "\\n"), sitename, client["socket"].fileno())) + if not request: client["state"] = "closed" - elif request and not request.startswith("COMMAND") and "ResponseHeader: fixed16\n" not in request: - respond_client_with_error(client, "Invalid request, you must specify ResponseHeader: fixed16.") - elif request: - client["state"] = "wait_for_channel" - client["since"] = time.time() - client["request"] = request - # log("Get new request from %s/%d (%d bytes)" % - # (sitename, client["socket"].fileno(), len(request))) else: - client["state"] = "closed" + if sitestate["state"] == "starting": + respond_client_with_error(client, "Site is currently not reachable.") + elif not request.startswith("GET") and not request.startswith("COMMAND"): + log("Invalid request [%s] from client %s/%d" % ( + request.replace("\n", "\\n"), sitename, client["socket"].fileno())) + client["state"] = "closed" + elif not request.startswith("COMMAND") and "ResponseHeader: fixed16\n" not in request: + respond_client_with_error(client, + "Invalid request, you must specify ResponseHeader: fixed16.") + else: + client["state"] = "wait_for_channel" + client["since"] = time.time() + client["request"] = request + # log("Get new request from %s/%d (%d bytes)" % + # (sitename, client["socket"].fileno(), len(request))) except Exception, e: if opt_debug: raise @@ -276,7 +279,7 @@ def get_new_requests(readable): def distribute_requests(): for sitename, sitestate in g_sites.items(): - if sitestate["state"] == "busy": + if sitestate["state"] != "ready": continue waiting_clients = [ client for client in sitestate["clients"] if client["state"] == "wait_for_channel"] @@ -328,11 +331,13 @@ def respond_client_with_error(client, message): except socket.error, e: if e.errno == 4: continue # Interrupted system call + else: + raise except Exception, e: if opt_debug: raise - log("Cannot send error message to client %s/%d: %s" % ( - sitename, client["socket"].fileno(), e)) + log("Cannot send error message to client %d: %s" % ( + client["socket"].fileno(), e)) client["state"] = "closed" if client.get("channel"): @@ -474,8 +479,10 @@ def receive_heartbeat(sitename, channel): sitename, channel["socket"].fileno(), chunk)) disconnect_from_site(sitename) else: - # log("Channel %s/%d: received valid heartbeat" % ( - # sitename, channel["socket"].fileno())) + # Set site to ready if at least one other channel exists + if len(sitestate["channels"]) > 1: + sitestate["state"] = "ready" + sitestate["state"] = "ready" # at least one connection is up channel["state"] = "ready" sitestate["heartbeat"]["since"] = time.time() sitestate["heartbeat"]["channel"] = None @@ -542,7 +549,7 @@ def garbage_collect_sockets(): if channel["state"] not in [ "error", "closed" ]] def format_time(t): - return "%s (%d secs ago)" % ( + return "%s (%3d secs ago)" % ( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(t)), time.time() - t) @@ -559,14 +566,16 @@ def dump_state(): sf.write(" Last failed connect: %s\n" % format_time(sitestate["last_failed_connect"])) sf.write(" Channels:\n") for channel in sitestate["channels"]: - sf.write(" %d - %s, client: %s\n" % + sf.write(" %3d - %-18s- client: %4s - since: %s\n" % (channel["socket"].fileno(), channel["state"], - channel.get("client") and channel["client"]["socket"].fileno() or "none")) + channel.get("client") and channel["client"]["socket"].fileno() or "none", + format_time(channel["since"]))) sf.write(" Clients:\n") for client in sitestate["clients"]: - sf.write(" %d - %s, channel: %s\n" % + sf.write(" %3d - %-18s- channel: %4s - since: %s\n" % (client["socket"].fileno(), client["state"], - client.get("channel") and client["channel"]["socket"].fileno() or "none")) + client.get("channel") and client["channel"]["socket"].fileno() or "none", + format_time(client["since"]))) sf.write(" Heartbeat:\n") hb = sitestate["heartbeat"] @@ -814,6 +823,7 @@ try: signal.signal(15, signal_handler) # TERM signal.signal(10, signal_handler) # USR1 signal.signal(12, signal_handler) # USR2 + signal.signal(13, signal.SIG_IGN) # PIPE # Now let's go... liveproxyd_run()
11 years
1
0
0
0
Check_MK Git: check_mk: liveproxyd: first working multisite
by Mathias Kettner
Module: check_mk Branch: master Commit: d17e77b8fd0d5b8218249a095343262c2244034d URL:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=d17e77b8fd0d5b…
Author: Mathias Kettner <mk(a)mathias-kettner.de> Date: Tue Jun 4 12:03:12 2013 +0200 liveproxyd: first working multisite --- doc/treasures/liveproxy/liveproxyd | 43 +++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/doc/treasures/liveproxy/liveproxyd b/doc/treasures/liveproxy/liveproxyd index 3a68359..0c2a371 100755 --- a/doc/treasures/liveproxy/liveproxyd +++ b/doc/treasures/liveproxy/liveproxyd @@ -34,14 +34,15 @@ def liveproxyd_run(): while True: try: + dump_state() initiate_connections() - readable, writable = do_select() + readable, writable = do_select(5.0) complete_connections(writable) accept_new_clients(readable) get_new_requests(readable) distribute_requests() get_responses(readable) - # garbage_collect_sockets() + garbage_collect_sockets() except MKSignalException, e: log("Got signal %d." % e._signum) @@ -89,7 +90,7 @@ def complete_connections(writable): channel["state"] = "ready" # Master/Mega/Central select(). We are going to be the select() master. Harhar. -def do_select(): +def do_select(timeout): read_fds = [] write_fds = [] @@ -112,7 +113,7 @@ def do_select(): if channel["state"] == "busy": read_fds.append(channel["socket"]) - r_able, w_able, x_able = select.select(read_fds, write_fds, [], 1.0) + r_able, w_able, x_able = select.select(read_fds, write_fds, [], timeout) return r_able, w_able @@ -143,7 +144,6 @@ def get_new_requests(readable): (sitename, client["socket"].fileno(), len(request))) else: client["state"] = "closed" - client["socket"] = None except Exception, e: if opt_debug: raise @@ -191,7 +191,6 @@ def respond_client_with_error(client, message): log("Cannot send error message to client %s/%d: %s" % ( sitename, client["socket"].fileno(), e)) client["state"] = "closed" - client["socket"] = None if "channel" in client: channel = client["channel"] @@ -236,7 +235,6 @@ def receive_response(sitename, channel): log("Cannot read response from %s/%d: %s" % (sitename, channel["socket"].fileno(), e)) channel["state"] = "error" - channel["socket"] = None respond_client_with_error(client, "Error receiving response from target site %s: %s" % (sitenam, e)) @@ -250,21 +248,23 @@ def receive_response(sitename, channel): raise log("Malformed response header from cannel %s/%d" % sitename, channel["socket"].fileno()) respond_client_with_error(client, "Malformed response from site %s" % sitename) - channel["state"] = error - channel["socket"] = None + channel["state"] = "error" + channel["response"] = "" if len(response) > bodylength + 16: - log("Too large response on channel %s/%d (%d exceeding bytes)" % ( - sitename, channel["socket"].fileno(), (len(response) - bodylength - 16))) + log("Too large response on channel %s/%d (%d exceeding bytes: [%s])" % ( + sitename, channel["socket"].fileno(), len(response) - bodylength - 16, response[bodylength + 16:])) respond_client_with_error(client, "To long response from site %s" % sitename) - channel["state"] = error - channel["socket"] = None + channel["state"] = "error" + channel["response"] = "" + elif len(response) < bodylength + 16: return # Response complete channel["state"] = "ready" + channel["response"] = "" del channel["client"] del client["channel"] @@ -277,7 +277,6 @@ def receive_response(sitename, channel): log("Cannot forward response from client %s/%d to client %s/%d: %s", (sitename, channel["socket"].fileno(), sitename, client["socket"].fileno(), e)) client["state"] = "error" - client["socket"] = None @@ -304,6 +303,14 @@ def create_unix_socket(sitename): g_sites[sitename]["socket"] = s +def garbage_collect_sockets(): + for sitename, sitestate in g_sites.items(): + sitestate["channels"] = [ + channel for channel in sitestate["channels"] + if channel["state"] not in [ "error", "closed" ]] + sitestate["clients"] = [ + channel for channel in sitestate["clients"] + if channel["state"] not in [ "error", "closed" ]] def dump_state(): log("Current connection state:") @@ -311,12 +318,12 @@ def dump_state(): log("[%s]" % sitename) log(" Channels:") for channel in sitestate["channels"]: - log(" %s/%d - %s, client: %s" % - (sitename, channel["socket"].fileno(), channel["state"], channel.get("client") and channel["client"]["socket"].fileno() or "none")) + log(" %d - %s, client: %s" % + (channel["socket"].fileno(), channel["state"], channel.get("client") and channel["client"]["socket"].fileno() or "none")) log(" Clients:") for client in sitestate["clients"]: - log(" %s/%d - %s, channel: %s" % - (sitename, client["socket"].fileno(), client["state"], client.get("channel") and client["channel"]["socket"].fileno() or "none")) + log(" %d - %s, channel: %s" % + (client["socket"].fileno(), client["state"], client.get("channel") and client["channel"]["socket"].fileno() or "none"))
11 years
1
0
0
0
Check_MK Git: check_mk: liveproxyd: first successful forwarding
by Mathias Kettner
Module: check_mk Branch: master Commit: 66a3f4148d9e74842323e300d7c3d5282c321873 URL:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=66a3f4148d9e74…
Author: Mathias Kettner <mk(a)mathias-kettner.de> Date: Tue Jun 4 11:41:25 2013 +0200 liveproxyd: first successful forwarding --- doc/treasures/liveproxy/liveproxyd | 130 +++++++++++++++++++++++++++++++----- 1 file changed, 114 insertions(+), 16 deletions(-) diff --git a/doc/treasures/liveproxy/liveproxyd b/doc/treasures/liveproxy/liveproxyd index d1313e4..3cb14ce 100755 --- a/doc/treasures/liveproxy/liveproxyd +++ b/doc/treasures/liveproxy/liveproxyd @@ -32,24 +32,26 @@ g_sites = { def liveproxyd_run(): open_client_sockets() - try: - while True: + while True: + try: initiate_connections() readable, writable = do_select() complete_connections(writable) accept_new_clients(readable) get_new_requests(readable) distribute_requests() - - - except MKSignalException, e: - log("Got signal %d." % e._signum) - if e._signum in [ 2, 3, 15 ]: - sys.exit(0) - elif e._signum == 10: - do_restart() - elif e._signum == 1: - do_reload() + get_responses(readable) + + except MKSignalException, e: + log("Got signal %d." % e._signum) + if e._signum in [ 2, 3, 15 ]: + sys.exit(0) + elif e._signum == 10: + do_restart() + elif e._signum == 12: + dump_state() + elif e._signum == 1: + do_reload() def initiate_connections(): # Create new channels to target sites. Nonblocking! @@ -104,6 +106,11 @@ def do_select(): if client["state"] == "idle": read_fds.append(client["socket"]) + # Responses from channels + for channel in sitestate["channels"]: + if channel["state"] == "busy": + read_fds.append(channel["socket"]) + r_able, w_able, x_able = select.select(read_fds, write_fds, [], 1.0) return r_able, w_able @@ -162,14 +169,16 @@ def forward_request(sitename, client, channel): chs.send(client["request"]) client["state"] = "wait_for_response" channel["state"] = "busy" + client["channel"] = channel + channel["client"] = client except Exception, e: if opt_debug: raise log("Error: %s" % e) - close_client_with_error(client, str(e)) + respond_client_with_error(client, str(e)) -def close_client_with_error(client, message): +def respond_client_with_error(client, message): try: client["socket"].send("400%12d\n%s\n" % (len(message) + 2, message)) except Exception, e: @@ -177,9 +186,15 @@ def close_client_with_error(client, message): raise log("Cannot send error message to client %s/%d: %s" % ( sitename, client["socket"].fileno(), e)) + client["state"] = "closed" + client["socket"] = None + + if "channel" in client: + channel = client["channel"] + del channel["client"] + del client["channel"] - client["socket"] = None - client["state"] = "closed" + client["state"] = "idle" @@ -192,6 +207,72 @@ def receive_request(sock): return textbody +def get_responses(readable): + for sitename, sitestate in g_sites.items(): + for channel in sitestate["channels"]: + if channel["state"] == "busy" and channel["socket"] in readable: + receive_response(sitename, channel) + + +def receive_response(sitename, channel): + client = channel["client"] + # We always assume fixed16 as response header! + old_response = channel.get("response", "") + try: + chunk = channel["socket"].recv(65536) + response = old_response + chunk + channel["response"] = response + except Exception, e: + if opt_debug: + raise + log("Cannot read response from %s/%d: %s" % + (sitename, channel["socket"].fileno(), e)) + channel["state"] = "error" + channel["socket"] = None + respond_client_with_error(client, "Error receiving response from target site %s: %s" % + (sitenam, e)) + + if len(response) < 16: # header not yet complete + return + + try: + bodylength = int(response[3:15]) + except Exception, e: + if opt_debug: + raise + log("Malformed response header from cannel %s/%d" % sitename, channel["socket"].fileno()) + respond_client_with_error(client, "Malformed response from site %s" % sitename) + channel["state"] = error + channel["socket"] = None + + if len(response) > bodylength + 16: + log("Too large response on channel %s/%d (%d exceeding bytes)" % ( + sitename, channel["socket"].fileno(), (len(response) - bodylength - 16))) + respond_client_with_error(client, "To long response from site %s" % sitename) + channel["state"] = error + channel["socket"] = None + + elif len(response) < bodylength + 16: + return + + # Response complete + channel["state"] = "ready" + del channel["client"] + del client["channel"] + + try: + client["socket"].send(response) + client["state"] = "idle" + except Exception, e: + if opt_debug: + raise + log("Cannot forward response from client %s/%d to client %s/%d: %s", + (sitename, channel["socket"].fileno(), sitename, client["socket"].fileno(), e)) + client["state"] = "error" + client["socket"] = None + + + def open_client_sockets(): if not os.path.exists(opt_socketdir): os.makedirs(opt_socketdir) @@ -216,6 +297,22 @@ def create_unix_socket(sitename): g_sites[sitename]["socket"] = s +def dump_state(): + log("Current connection state:") + for sitename, sitestate in g_sites.items(): + log("[%s]" % sitename) + log(" Channels:") + for channel in sitestate["channels"]: + log(" %s/%d - %s, client: %s" % + (sitename, channel["socket"].fileno(), channel["state"], channel.get("client"))) + log(" Clients:") + for client in sitestate["clients"]: + log(" %s/%d - %s, channel: %s" % + (sitename, client["socket"].fileno(), client["state"], client.get("channel"))) + + + + #. # .--Daemon/main---------------------------------------------------------. # | ____ __ _ | @@ -429,6 +526,7 @@ try: signal.signal(3, signal_handler) # QUIT signal.signal(15, signal_handler) # TERM signal.signal(10, signal_handler) # USR1 + signal.signal(12, signal_handler) # USR2 # Now let's go... liveproxyd_run()
11 years
1
0
0
0
Check_MK Git: check_mk: liveproxyd: detect clients closing the socket
by Mathias Kettner
Module: check_mk Branch: master Commit: f560d385f372014efa93ecf0e7ca976f211120f6 URL:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=f560d385f37201…
Author: Mathias Kettner <mk(a)mathias-kettner.de> Date: Tue Jun 4 11:49:52 2013 +0200 liveproxyd: detect clients closing the socket --- doc/treasures/liveproxy/liveproxyd | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/doc/treasures/liveproxy/liveproxyd b/doc/treasures/liveproxy/liveproxyd index 3cb14ce..3a68359 100755 --- a/doc/treasures/liveproxy/liveproxyd +++ b/doc/treasures/liveproxy/liveproxyd @@ -41,6 +41,7 @@ def liveproxyd_run(): get_new_requests(readable) distribute_requests() get_responses(readable) + # garbage_collect_sockets() except MKSignalException, e: log("Got signal %d." % e._signum) @@ -118,7 +119,6 @@ def do_select(): def accept_new_clients(readable): for sitename, sitestate in g_sites.items(): if sitestate["socket"] in readable: - print readable try: s, addrinfo = sitestate["socket"].accept() log("Accepted new client %s/%d" % (sitename, s.fileno())) @@ -134,12 +134,16 @@ def get_new_requests(readable): if client["state"] == "idle" and \ client["socket"] in readable: try: - request = receive_request(client["socket"]) - client["state"] = "wait_for_channel" - client["since"] = time.time() - client["request"] = request - log("Get new request from %s/%d (%d bytes)" % - (sitename, client["socket"].fileno(), len(request))) + request = receive_request(sitename, client["socket"]) + if request: + client["state"] = "wait_for_channel" + client["since"] = time.time() + client["request"] = request + log("Get new request from %s/%d (%d bytes)" % + (sitename, client["socket"].fileno(), len(request))) + else: + client["state"] = "closed" + client["socket"] = None except Exception, e: if opt_debug: raise @@ -200,10 +204,14 @@ def respond_client_with_error(client, message): # TODO: one malicious client can hang the whole proxy. In order # to prevent this we'd need partial requests... -def receive_request(sock): +def receive_request(sitename, sock): textbody = "" while not textbody.endswith("\n\n"): - textbody += sock.recv(65536) + chunk = sock.recv(65536) + if not chunk: + log("Client %s/%d closed connection." % (sitename, sock.fileno())) + return None + textbody += chunk return textbody @@ -304,11 +312,11 @@ def dump_state(): log(" Channels:") for channel in sitestate["channels"]: log(" %s/%d - %s, client: %s" % - (sitename, channel["socket"].fileno(), channel["state"], channel.get("client"))) + (sitename, channel["socket"].fileno(), channel["state"], channel.get("client") and channel["client"]["socket"].fileno() or "none")) log(" Clients:") for client in sitestate["clients"]: log(" %s/%d - %s, channel: %s" % - (sitename, client["socket"].fileno(), client["state"], client.get("channel"))) + (sitename, client["socket"].fileno(), client["state"], client.get("channel") and client["channel"]["socket"].fileno() or "none"))
11 years
1
0
0
0
Check_MK Git: check_mk: First skeleton of livestatus proxy
by Mathias Kettner
Module: check_mk Branch: master Commit: a051c9cba3f0db90e93b966079883c88a669a370 URL:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=a051c9cba3f0db…
Author: Mathias Kettner <mk(a)mathias-kettner.de> Date: Tue Jun 4 10:46:24 2013 +0200 First skeleton of livestatus proxy --- doc/treasures/liveproxy/TODO | 11 + doc/treasures/liveproxy/liveproxyd | 370 +++++++++++++++++++++++++++++++++ doc/treasures/liveproxy/liveproxyd.mk | 8 + 3 files changed, 389 insertions(+) Diff:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commitdiff;h=a051c9cba3…
11 years
1
0
0
0
Check_MK Git: check_mk: Livestatus proxy: forwarding implemented
by Mathias Kettner
Module: check_mk Branch: master Commit: c0f162de2cab90ec10e882578fb90cc269fd32bc URL:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=c0f162de2cab90…
Author: Mathias Kettner <mk(a)mathias-kettner.de> Date: Tue Jun 4 11:14:38 2013 +0200 Livestatus proxy: forwarding implemented --- doc/treasures/liveproxy/liveproxyd | 107 +++++++++++++++++++++++++++++++----- 1 file changed, 92 insertions(+), 15 deletions(-) diff --git a/doc/treasures/liveproxy/liveproxyd b/doc/treasures/liveproxy/liveproxyd index f08771e..d1313e4 100755 --- a/doc/treasures/liveproxy/liveproxyd +++ b/doc/treasures/liveproxy/liveproxyd @@ -38,6 +38,8 @@ def liveproxyd_run(): readable, writable = do_select() complete_connections(writable) accept_new_clients(readable) + get_new_requests(readable) + distribute_requests() except MKSignalException, e: @@ -81,7 +83,7 @@ def complete_connections(writable): for channel in sitestate["channels"]: if channel["state"] == "connecting" and channel["socket"] in writable: log("Channel %s/%d successfully connected" % (sitename, channel["socket"].fileno())) - channel["state"] = "connected" + channel["state"] = "ready" # Master/Mega/Central select(). We are going to be the select() master. Harhar. def do_select(): @@ -97,10 +99,99 @@ def do_select(): # new client connections read_fds.append(sitestate["socket"]) + # new requests from existing clients + for client in sitestate["clients"]: + if client["state"] == "idle": + read_fds.append(client["socket"]) + r_able, w_able, x_able = select.select(read_fds, write_fds, [], 1.0) return r_able, w_able +def accept_new_clients(readable): + for sitename, sitestate in g_sites.items(): + if sitestate["socket"] in readable: + print readable + try: + s, addrinfo = sitestate["socket"].accept() + log("Accepted new client %s/%d" % (sitename, s.fileno())) + sitestate["clients"].append({"socket" : s, "state" : "idle", "since" : time.time()}) + except Exception, e: + if opt_debug: + raise + log("Failed to accept new client for %s: %s" % (sitename, e)) + +def get_new_requests(readable): + for sitename, sitestate in g_sites.items(): + for client in sitestate["clients"]: + if client["state"] == "idle" and \ + client["socket"] in readable: + try: + request = receive_request(client["socket"]) + client["state"] = "wait_for_channel" + client["since"] = time.time() + client["request"] = request + log("Get new request from %s/%d (%d bytes)" % + (sitename, client["socket"].fileno(), len(request))) + except Exception, e: + if opt_debug: + raise + log("Cannot read request from client %s/%d: %s" % + (sitename, client["socket"].fileno(), e)) + +def distribute_requests(): + for sitename, sitestate in g_sites.items(): + waiting_clients = [ client for client in sitestate["clients"] if client["state"] == "wait_for_channel"] + # Sort after waiting time, we should be fair to all... + waiting_clients.sort(cmp = lambda a, b: cmp(a["since"], b["since"])) + + for channel in sitestate["channels"]: + if not waiting_clients: + break + if channel["state"] == "ready": + client = waiting_clients[0] + del waiting_clients[0] + forward_request(sitename, client, channel) + +def forward_request(sitename, client, channel): + cls = client["socket"] + chs = channel["socket"] + log("Forwarding request from client %s/%d to channel %s/%d" % ( + sitename, cls.fileno(), sitename, chs.fileno())) + try: + chs.send(client["request"]) + client["state"] = "wait_for_response" + channel["state"] = "busy" + except Exception, e: + if opt_debug: + raise + log("Error: %s" % e) + close_client_with_error(client, str(e)) + + +def close_client_with_error(client, message): + try: + client["socket"].send("400%12d\n%s\n" % (len(message) + 2, message)) + except Exception, e: + if opt_debug: + raise + log("Cannot send error message to client %s/%d: %s" % ( + sitename, client["socket"].fileno(), e)) + + client["socket"] = None + client["state"] = "closed" + + + +# TODO: one malicious client can hang the whole proxy. In order +# to prevent this we'd need partial requests... +def receive_request(sock): + textbody = "" + while not textbody.endswith("\n\n"): + textbody += sock.recv(65536) + return textbody + + def open_client_sockets(): if not os.path.exists(opt_socketdir): os.makedirs(opt_socketdir) @@ -125,20 +216,6 @@ def create_unix_socket(sitename): g_sites[sitename]["socket"] = s -def accept_new_clients(readable): - for sitename, sitestate in g_sites.items(): - if sitestate["socket"] in readable: - print readable - try: - s, addrinfo = sitestate["socket"].accept() - log("Accepted new client %s/%d" % (sitename, s.fileno())) - sitestate["clients"].append(s) - except Exception, e: - if opt_debug: - raise - log("Failed to accept new client for %s: %s" % (sitename, e)) - - #. # .--Daemon/main---------------------------------------------------------. # | ____ __ _ |
11 years
1
0
0
0
Check_MK Git: check_mk: Merge branch 'master' of ssh://mathias-kettner.de/ check_mk
by Mathias Kettner
Module: check_mk Branch: master Commit: fe8cc3aa434aebd7b34ff1ce046111fe2b1ccca5 URL:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=fe8cc3aa434aeb…
Author: Mathias Kettner <mk(a)mathias-kettner.de> Date: Tue Jun 4 17:43:50 2013 +0200 Merge branch 'master' of ssh://mathias-kettner.de/check_mk ---
11 years
1
0
0
0
Check_MK Git: check_mk: liveproxy: fixed several side cases
by Mathias Kettner
Module: check_mk Branch: master Commit: ef958a73ac5d92c6b2aa96790b3b3618a9b00d33 URL:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=ef958a73ac5d92…
Author: Mathias Kettner <mk(a)mathias-kettner.de> Date: Tue Jun 4 15:29:39 2013 +0200 liveproxy: fixed several side cases --- doc/treasures/liveproxy/liveproxyd | 78 +++++++++++++++++++++--------------- 1 file changed, 45 insertions(+), 33 deletions(-) diff --git a/doc/treasures/liveproxy/liveproxyd b/doc/treasures/liveproxy/liveproxyd index 19df726..0e746c0 100755 --- a/doc/treasures/liveproxy/liveproxyd +++ b/doc/treasures/liveproxy/liveproxyd @@ -21,7 +21,7 @@ sites = {} # State of all sites g_sites = { # "mysite" : { -# "state" : "starting" / "up" +# "state" : "starting" / "up" / "busy" # "socket" : -> UNIX server socket, # "clients" : [ ... ], # "channels" : [ ... ], @@ -84,7 +84,6 @@ def initiate_connections(): if len(channels) < siteconf["channels"]: if time.time() - sitestate["last_failed_connect"] >= siteconf["retry"]: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - log("Creating channel %s/%d..." % (sitename, s.fileno())) s.setblocking(0) try: s.connect(siteconf["socket"]) @@ -108,7 +107,7 @@ def initiate_connections(): def do_heartbeats(): now = time.time() for sitename, sitestate in g_sites.items(): - if sitestate["state"] == "ready": + if sitestate["state"] != "starting": rate, timeout = sites[sitename]["heartbeat"] channel = sitestate["heartbeat"]["channel"] since = sitestate["heartbeat"]["since"] @@ -133,7 +132,7 @@ def send_heartbeat(sitename, sitestate): while True: next_channel = None for channel in sitestate["channels"]: - if channel["state"] == "ready": + if channel["state"] != "starting": if next_channel == None or channel["since"] < next_channel["since"]: min_since = channel["since"] next_channel = channel @@ -143,7 +142,7 @@ def send_heartbeat(sitename, sitestate): else: channel = next_channel - log("Sending heartbeat to channel %s/%d" % (sitename, channel["socket"].fileno())) + # log("Sending heartbeat to channel %s/%d" % (sitename, channel["socket"].fileno())) try: channel["socket"].send("GET status\nKeepAlive: on\nColumns: execute_service_checks\n\n") channel["state"] = "heartbeat" @@ -233,9 +232,9 @@ def get_new_requests(readable): for sitename, sitestate in g_sites.items(): for client in sitestate["clients"]: if client["state"] == "idle" and \ - client["socket"] in readable: + (client["socket"] in readable or client.get("nextrequest")): try: - request = receive_request(sitename, client["socket"]) + request = receive_request(sitename, client) if request: client["state"] = "wait_for_channel" client["since"] = time.time() @@ -252,17 +251,24 @@ def get_new_requests(readable): def distribute_requests(): for sitename, sitestate in g_sites.items(): + if sitestate["state"] == "busy": + continue waiting_clients = [ client for client in sitestate["clients"] if client["state"] == "wait_for_channel"] # Sort after waiting time, we should be fair to all... waiting_clients.sort(cmp = lambda a, b: cmp(a["since"], b["since"])) - for channel in sitestate["channels"]: - if not waiting_clients: - break - if channel["state"] == "ready": - client = waiting_clients[0] - del waiting_clients[0] - forward_request(sitename, client, channel) + # one channel must always be kept for heartbeat + allowed_channels = len([c for c in sitestate["channels"] if c["state"] in ["ready", "heartbeat"]]) + if allowed_channels <= 1: + sitestate["state"] = "busy" + else: + for channel in sitestate["channels"]: + if not waiting_clients: + break + if channel["state"] == "ready": + client = waiting_clients[0] + del waiting_clients[0] + forward_request(sitename, client, channel) def forward_request(sitename, client, channel): cls = client["socket"] @@ -271,10 +277,14 @@ def forward_request(sitename, client, channel): client["request"], sitename, cls.fileno(), sitename, chs.fileno())) try: chs.send(client["request"]) - client["state"] = "wait_for_response" - channel["state"] = "busy" - client["channel"] = channel - channel["client"] = client + if not client["request"].startswith("COMMAND"): + client["state"] = "wait_for_response" + channel["state"] = "busy" + client["channel"] = channel + channel["client"] = client + else: + client["request"] = "" + client["state"] = "idle" except Exception, e: if opt_debug: raise @@ -304,20 +314,24 @@ def respond_client_with_error(client, message): # TODO: one malicious client can hang the whole proxy. In order # to prevent this we'd need partial requests... -def receive_request(sitename, sock): - textbody = "" - while not textbody.endswith("\n\n"): +def receive_request(sitename, client): + # Note: Multisite can send several requests at once. For example + # a command and a wait query (reschedule button) + request = client.get("nextrequest", "") + while "\n\n" not in request: try: - chunk = sock.recv(65536) - log("CHUNK: %s" % chunk) + chunk = client["socket"].recv(65536) + request += chunk except socket.error, e: if e.errno != 4: # Interrupted system cal raise if not chunk: - log("Client %s/%d closed connection." % (sitename, sock.fileno())) + log("Client %s/%d closed connection." % (sitename, client["socket"].fileno())) return None - textbody += chunk - return textbody + end = request.index("\n\n") + client["nextrequest"] = request[end+2:] + request = request[:end+2] + return request def get_responses(readable): @@ -332,14 +346,11 @@ def get_responses(readable): def receive_response(sitename, channel): - log("HIRN: Hole Antwort...") client = channel["client"] # We always assume fixed16 as response header! old_response = channel.get("response", "") - log("HIRN: altes test: %s" % old_response) try: chunk = channel["socket"].recv(65536) - log("HIRN: neuer Chunk: %s" % chunk) if not chunk: raise Exception("Connect closed by foreign host") @@ -355,12 +366,10 @@ def receive_response(sitename, channel): return if len(response) < 16: # header not yet complete - log("HIRN: noch nicht fertig, nur [%s]" % response) return try: bodylength = int(response[3:15]) - log("HIRN: koerper ist %d bytes" % bodylength) except Exception, e: if opt_debug: raise @@ -383,6 +392,7 @@ def receive_response(sitename, channel): # Response complete channel["state"] = "ready" + g_sites[sitename]["state"] = "ready" # at least one channel free channel["response"] = "" del channel["client"] del client["channel"] @@ -411,8 +421,8 @@ def receive_heartbeat(sitename, channel): sitename, channel["socket"].fileno(), chunk)) disconnect_from_site(sitename) else: - log("Channel %s/%d: received valid heartbeat" % ( - sitename, channel["socket"].fileno())) + # log("Channel %s/%d: received valid heartbeat" % ( + # sitename, channel["socket"].fileno())) channel["state"] = "ready" sitestate["heartbeat"]["since"] = time.time() sitestate["heartbeat"]["channel"] = None @@ -644,6 +654,8 @@ def read_configuration(): "last_reset" : time.time(), "last_failed_connect" : 0, } + if siteconf["channels"] <= 1: + raise bail_out("Invalid configuration for site %s: you need at least two channels" % sitename) def do_restart(): log("Restarting myself")
11 years
1
0
0
0
Check_MK Git: check_mk: FIX: fix invalid request in livestatus query after reconnect
by Mathias Kettner
Module: check_mk Branch: master Commit: 841d4b05e1540975850a7905d3786d53f370b26d URL:
http://git.mathias-kettner.de/git/?p=check_mk.git;a=commit;h=841d4b05e15409…
Author: Mathias Kettner <mk(a)mathias-kettner.de> Date: Tue Jun 4 16:53:13 2013 +0200 FIX: fix invalid request in livestatus query after reconnect --- ChangeLog | 1 + web/htdocs/livestatus.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/ChangeLog b/ChangeLog index 1daa238..2c27ccc 100644 --- a/ChangeLog +++ b/ChangeLog @@ -185,6 +185,7 @@ Multisite: * List of views: Output the alias of a datasource instead of internal name * FIX: fix column editor for join columns if "SERVICE:" is l10n'ed + * FIX: fix invalid request in livestatus query after reconnect WATO: * FIX: convert editing of global setting to POST. This avoid URL-too-long diff --git a/web/htdocs/livestatus.py b/web/htdocs/livestatus.py index bff1cfd..640835a 100644 --- a/web/htdocs/livestatus.py +++ b/web/htdocs/livestatus.py @@ -261,6 +261,7 @@ class BaseConnection: return self.recv_response(query, add_headers) def send_query(self, query, add_headers = "", do_reconnect=True): + orig_query = query if self.socket == None: self.connect() if not query.endswith("\n"): @@ -288,7 +289,7 @@ class BaseConnection: # Automatically try to reconnect in case of an error, but # only once. self.connect() - self.send_query(query, add_headers, False) + self.send_query(orig_query, add_headers, False) return raise MKLivestatusSocketError("RC1:" + str(e))
11 years
1
0
0
0
← Newer
1
...
14
15
16
17
18
19
Older →
Jump to page:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Results per page:
10
25
50
100
200