From 8655f75babd298902bc581f4329a266183866d67 Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Thu, 20 Jan 2022 14:07:09 -0800 Subject: [PATCH] Record stats from polled data on script shutdown If there was history data collected via the --poll-loops option, but not yet used to compute history stats, do so on script shutdown. This should reduce the amount of data lost when the script restarts, for example to reboot the system on which it runs, since --poll-loops allows for collection of history data beyond the 15-minute buffer size the dish (currently) holds. This will only work if the script is shut down via SIGTERM or SIGINT (for example, by interrupting with Control-C). --- dish_common.py | 36 +++++++++++++++++++++++++----------- dish_grpc_influx.py | 10 ++++++++-- dish_grpc_influx2.py | 10 ++++++++-- dish_grpc_sqlite.py | 14 +++++++++----- dish_grpc_text.py | 19 +++++++++++++++++-- 5 files changed, 67 insertions(+), 22 deletions(-) diff --git a/dish_common.py b/dish_common.py index 9b67279..fdb40ba 100644 --- a/dish_common.py +++ b/dish_common.py @@ -168,7 +168,7 @@ class GlobalState: self.context.close() -def get_data(opts, gstate, add_item, add_sequence, add_bulk=None): +def get_data(opts, gstate, add_item, add_sequence, add_bulk=None, flush_history=False): """Fetch data from the dish, pull it apart and call back with the pieces. This function uses call backs to return the useful data. If need_id is set @@ -189,16 +189,27 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None): prototype: add_bulk(bulk_data, count, start_timestamp, start_counter) + flush_history (bool): Optional. If true, run in a special mode that + emits (only) history stats for already polled data, if any, + regardless of --poll-loops state. Intended for script shutdown + operation, in order to flush stats for polled history data which + would otherwise be lost on script restart. Returns: 1 if there were any failures getting data from the dish, otherwise 0. """ - rc = get_status_data(opts, gstate, add_item, add_sequence) + if flush_history and opts.poll_loops < 2: + return 0 + + rc = 0 + + if not flush_history: + rc = get_status_data(opts, gstate, add_item, add_sequence) if opts.history_stats_mode and not rc: - rc = get_history_stats(opts, gstate, add_item, add_sequence) + rc = get_history_stats(opts, gstate, add_item, add_sequence, flush_history) - if opts.bulk_mode and add_bulk and not rc: + if not flush_history and opts.bulk_mode and add_bulk and not rc: rc = get_bulk_data(opts, gstate, add_bulk) return rc @@ -263,13 +274,16 @@ def get_status_data(opts, gstate, add_item, add_sequence): return 0 -def get_history_stats(opts, gstate, add_item, add_sequence): +def get_history_stats(opts, gstate, add_item, add_sequence, flush_history): """Fetch history stats. See `get_data` for details.""" - try: - history = starlink_grpc.get_history(context=gstate.context) - except grpc.RpcError as e: - conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e))) + if flush_history: history = None + else: + try: + history = starlink_grpc.get_history(context=gstate.context) + except grpc.RpcError as e: + conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e))) + history = None parse_samples = opts.samples if gstate.counter_stats is None else -1 start = gstate.counter_stats if gstate.counter_stats else None @@ -293,14 +307,14 @@ def get_history_stats(opts, gstate, add_item, add_sequence): else: gstate.accum_history = history - if gstate.poll_count < opts.poll_loops - 1: + if gstate.poll_count < opts.poll_loops - 1 and not flush_history: gstate.poll_count += 1 return 0 # This can happen if all polling attempts failed. Verbose output has # already happened, so just return. if gstate.accum_history is None: - return 1 + return 0 if flush_history else 1 gstate.poll_count = 0 diff --git a/dish_grpc_influx.py b/dish_grpc_influx.py index 9cfca79..c25c38d 100644 --- a/dish_grpc_influx.py +++ b/dish_grpc_influx.py @@ -221,7 +221,7 @@ def sync_timebase(opts, gstate): gstate.deferred_points.clear() -def loop_body(opts, gstate): +def loop_body(opts, gstate, shutdown=False): fields = {"status": {}, "ping_stats": {}, "usage": {}} def cb_add_item(key, val, category): @@ -251,7 +251,12 @@ def loop_body(opts, gstate): points[-1]["fields"]["counter"] = counter + count now = time.time() - rc = dish_common.get_data(opts, gstate, cb_add_item, cb_add_sequence, add_bulk=cb_add_bulk) + rc = dish_common.get_data(opts, + gstate, + cb_add_item, + cb_add_sequence, + add_bulk=cb_add_bulk, + flush_history=shutdown) if rc: return rc @@ -318,6 +323,7 @@ def main(): except Terminated: pass finally: + loop_body(opts, gstate, shutdown=True) if gstate.points: rc = flush_points(opts, gstate) gstate.influx_client.close() diff --git a/dish_grpc_influx2.py b/dish_grpc_influx2.py index 9f8d1df..5c8c5e9 100644 --- a/dish_grpc_influx2.py +++ b/dish_grpc_influx2.py @@ -220,7 +220,7 @@ def sync_timebase(opts, gstate): gstate.deferred_points.clear() -def loop_body(opts, gstate): +def loop_body(opts, gstate, shutdown=False): fields = {"status": {}, "ping_stats": {}, "usage": {}} def cb_add_item(key, val, category): @@ -252,7 +252,12 @@ def loop_body(opts, gstate): now = time.time() # work with UTC here # now = time.mktime(datetime.utcnow().timetuple()) - rc = dish_common.get_data(opts, gstate, cb_add_item, cb_add_sequence, add_bulk=cb_add_bulk) + rc = dish_common.get_data(opts, + gstate, + cb_add_item, + cb_add_sequence, + add_bulk=cb_add_bulk, + flush_history=shutdown) if rc: return rc @@ -318,6 +323,7 @@ def main(): except Terminated: pass finally: + loop_body(opts, gstate, shutdown=True) if gstate.points: rc = flush_points(opts, gstate) gstate.influx_client.close() diff --git a/dish_grpc_sqlite.py b/dish_grpc_sqlite.py index b6ff651..9688aaf 100644 --- a/dish_grpc_sqlite.py +++ b/dish_grpc_sqlite.py @@ -72,7 +72,7 @@ def parse_args(): group.add_argument("-k", "--skip-query", action="store_true", - help="Skip querying for prior sample write point in bulk mode") + help="Skip querying for prior sample write point in history modes") opts = dish_common.run_arg_parser(parser, need_id=True) @@ -99,7 +99,7 @@ def query_counter(opts, gstate, column, table): return 0, None -def loop_body(opts, gstate): +def loop_body(opts, gstate, shutdown=False): tables = {"status": {}, "ping_stats": {}, "usage": {}} hist_cols = ["time", "id"] hist_rows = [] @@ -123,14 +123,17 @@ def loop_body(opts, gstate): hist_rows.append(row) now = int(time.time()) - rc = dish_common.get_status_data(opts, gstate, cb_add_item, cb_add_sequence) + rc = 0 + + if not shutdown: + rc = dish_common.get_status_data(opts, gstate, cb_add_item, cb_add_sequence) if opts.history_stats_mode and not rc: if gstate.counter_stats is None and not opts.skip_query and opts.samples < 0: _, gstate.counter_stats = query_counter(opts, gstate, "end_counter", "ping_stats") - rc = dish_common.get_history_stats(opts, gstate, cb_add_item, cb_add_sequence) + rc = dish_common.get_history_stats(opts, gstate, cb_add_item, cb_add_sequence, shutdown) - if opts.bulk_mode and not rc: + if not shutdown and opts.bulk_mode and not rc: if gstate.counter is None and not opts.skip_query and opts.bulk_samples < 0: gstate.timestamp, gstate.counter = query_counter(opts, gstate, "counter", "history") rc = dish_common.get_bulk_data(opts, gstate, cb_add_bulk) @@ -298,6 +301,7 @@ def main(): except Terminated: pass finally: + loop_body(opts, gstate, shutdown=True) gstate.sql_conn.close() gstate.shutdown() diff --git a/dish_grpc_text.py b/dish_grpc_text.py index 3f2f406..ef2b696 100644 --- a/dish_grpc_text.py +++ b/dish_grpc_text.py @@ -13,6 +13,7 @@ the alert_detail mode, you can use the alerts bitmask in the status group. from datetime import datetime import logging +import signal import sys import time @@ -57,6 +58,15 @@ VERBOSE_FIELD_MAP = { } +class Terminated(Exception): + pass + + +def handle_sigterm(signum, frame): + # Turn SIGTERM into an exception so main loop can clean up + raise Terminated + + def parse_args(): parser = dish_common.create_arg_parser( output_description= @@ -127,7 +137,7 @@ def print_header(opts): return 0 -def loop_body(opts, gstate): +def loop_body(opts, gstate, shutdown=False): if opts.verbose: csv_data = [] else: @@ -175,7 +185,8 @@ def loop_body(opts, gstate): gstate, cb_data_add_item, cb_data_add_sequence, - add_bulk=cb_add_bulk) + add_bulk=cb_add_bulk, + flush_history=shutdown) if opts.verbose: if csv_data: @@ -200,6 +211,7 @@ def main(): sys.exit(rc) gstate = dish_common.GlobalState(target=opts.target) + signal.signal(signal.SIGTERM, handle_sigterm) try: next_loop = time.monotonic() @@ -211,7 +223,10 @@ def main(): time.sleep(next_loop - now) else: break + except Terminated: + pass finally: + loop_body(opts, gstate, shutdown=True) gstate.shutdown() sys.exit(rc)