diff --git a/dish_common.py b/dish_common.py index 9b67279..fdb40ba 100644 --- a/dish_common.py +++ b/dish_common.py @@ -168,7 +168,7 @@ class GlobalState: self.context.close() -def get_data(opts, gstate, add_item, add_sequence, add_bulk=None): +def get_data(opts, gstate, add_item, add_sequence, add_bulk=None, flush_history=False): """Fetch data from the dish, pull it apart and call back with the pieces. This function uses call backs to return the useful data. If need_id is set @@ -189,16 +189,27 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None): prototype: add_bulk(bulk_data, count, start_timestamp, start_counter) + flush_history (bool): Optional. If true, run in a special mode that + emits (only) history stats for already polled data, if any, + regardless of --poll-loops state. Intended for script shutdown + operation, in order to flush stats for polled history data which + would otherwise be lost on script restart. Returns: 1 if there were any failures getting data from the dish, otherwise 0. """ - rc = get_status_data(opts, gstate, add_item, add_sequence) + if flush_history and opts.poll_loops < 2: + return 0 + + rc = 0 + + if not flush_history: + rc = get_status_data(opts, gstate, add_item, add_sequence) if opts.history_stats_mode and not rc: - rc = get_history_stats(opts, gstate, add_item, add_sequence) + rc = get_history_stats(opts, gstate, add_item, add_sequence, flush_history) - if opts.bulk_mode and add_bulk and not rc: + if not flush_history and opts.bulk_mode and add_bulk and not rc: rc = get_bulk_data(opts, gstate, add_bulk) return rc @@ -263,13 +274,16 @@ def get_status_data(opts, gstate, add_item, add_sequence): return 0 -def get_history_stats(opts, gstate, add_item, add_sequence): +def get_history_stats(opts, gstate, add_item, add_sequence, flush_history): """Fetch history stats. See `get_data` for details.""" - try: - history = starlink_grpc.get_history(context=gstate.context) - except grpc.RpcError as e: - conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e))) + if flush_history: history = None + else: + try: + history = starlink_grpc.get_history(context=gstate.context) + except grpc.RpcError as e: + conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e))) + history = None parse_samples = opts.samples if gstate.counter_stats is None else -1 start = gstate.counter_stats if gstate.counter_stats else None @@ -293,14 +307,14 @@ def get_history_stats(opts, gstate, add_item, add_sequence): else: gstate.accum_history = history - if gstate.poll_count < opts.poll_loops - 1: + if gstate.poll_count < opts.poll_loops - 1 and not flush_history: gstate.poll_count += 1 return 0 # This can happen if all polling attempts failed. Verbose output has # already happened, so just return. if gstate.accum_history is None: - return 1 + return 0 if flush_history else 1 gstate.poll_count = 0 diff --git a/dish_grpc_influx.py b/dish_grpc_influx.py index 9cfca79..c25c38d 100644 --- a/dish_grpc_influx.py +++ b/dish_grpc_influx.py @@ -221,7 +221,7 @@ def sync_timebase(opts, gstate): gstate.deferred_points.clear() -def loop_body(opts, gstate): +def loop_body(opts, gstate, shutdown=False): fields = {"status": {}, "ping_stats": {}, "usage": {}} def cb_add_item(key, val, category): @@ -251,7 +251,12 @@ def loop_body(opts, gstate): points[-1]["fields"]["counter"] = counter + count now = time.time() - rc = dish_common.get_data(opts, gstate, cb_add_item, cb_add_sequence, add_bulk=cb_add_bulk) + rc = dish_common.get_data(opts, + gstate, + cb_add_item, + cb_add_sequence, + add_bulk=cb_add_bulk, + flush_history=shutdown) if rc: return rc @@ -318,6 +323,7 @@ def main(): except Terminated: pass finally: + loop_body(opts, gstate, shutdown=True) if gstate.points: rc = flush_points(opts, gstate) gstate.influx_client.close() diff --git a/dish_grpc_influx2.py b/dish_grpc_influx2.py index 9f8d1df..5c8c5e9 100644 --- a/dish_grpc_influx2.py +++ b/dish_grpc_influx2.py @@ -220,7 +220,7 @@ def sync_timebase(opts, gstate): gstate.deferred_points.clear() -def loop_body(opts, gstate): +def loop_body(opts, gstate, shutdown=False): fields = {"status": {}, "ping_stats": {}, "usage": {}} def cb_add_item(key, val, category): @@ -252,7 +252,12 @@ def loop_body(opts, gstate): now = time.time() # work with UTC here # now = time.mktime(datetime.utcnow().timetuple()) - rc = dish_common.get_data(opts, gstate, cb_add_item, cb_add_sequence, add_bulk=cb_add_bulk) + rc = dish_common.get_data(opts, + gstate, + cb_add_item, + cb_add_sequence, + add_bulk=cb_add_bulk, + flush_history=shutdown) if rc: return rc @@ -318,6 +323,7 @@ def main(): except Terminated: pass finally: + loop_body(opts, gstate, shutdown=True) if gstate.points: rc = flush_points(opts, gstate) gstate.influx_client.close() diff --git a/dish_grpc_sqlite.py b/dish_grpc_sqlite.py index b6ff651..9688aaf 100644 --- a/dish_grpc_sqlite.py +++ b/dish_grpc_sqlite.py @@ -72,7 +72,7 @@ def parse_args(): group.add_argument("-k", "--skip-query", action="store_true", - help="Skip querying for prior sample write point in bulk mode") + help="Skip querying for prior sample write point in history modes") opts = dish_common.run_arg_parser(parser, need_id=True) @@ -99,7 +99,7 @@ def query_counter(opts, gstate, column, table): return 0, None -def loop_body(opts, gstate): +def loop_body(opts, gstate, shutdown=False): tables = {"status": {}, "ping_stats": {}, "usage": {}} hist_cols = ["time", "id"] hist_rows = [] @@ -123,14 +123,17 @@ def loop_body(opts, gstate): hist_rows.append(row) now = int(time.time()) - rc = dish_common.get_status_data(opts, gstate, cb_add_item, cb_add_sequence) + rc = 0 + + if not shutdown: + rc = dish_common.get_status_data(opts, gstate, cb_add_item, cb_add_sequence) if opts.history_stats_mode and not rc: if gstate.counter_stats is None and not opts.skip_query and opts.samples < 0: _, gstate.counter_stats = query_counter(opts, gstate, "end_counter", "ping_stats") - rc = dish_common.get_history_stats(opts, gstate, cb_add_item, cb_add_sequence) + rc = dish_common.get_history_stats(opts, gstate, cb_add_item, cb_add_sequence, shutdown) - if opts.bulk_mode and not rc: + if not shutdown and opts.bulk_mode and not rc: if gstate.counter is None and not opts.skip_query and opts.bulk_samples < 0: gstate.timestamp, gstate.counter = query_counter(opts, gstate, "counter", "history") rc = dish_common.get_bulk_data(opts, gstate, cb_add_bulk) @@ -298,6 +301,7 @@ def main(): except Terminated: pass finally: + loop_body(opts, gstate, shutdown=True) gstate.sql_conn.close() gstate.shutdown() diff --git a/dish_grpc_text.py b/dish_grpc_text.py index 3f2f406..ef2b696 100644 --- a/dish_grpc_text.py +++ b/dish_grpc_text.py @@ -13,6 +13,7 @@ the alert_detail mode, you can use the alerts bitmask in the status group. from datetime import datetime import logging +import signal import sys import time @@ -57,6 +58,15 @@ VERBOSE_FIELD_MAP = { } +class Terminated(Exception): + pass + + +def handle_sigterm(signum, frame): + # Turn SIGTERM into an exception so main loop can clean up + raise Terminated + + def parse_args(): parser = dish_common.create_arg_parser( output_description= @@ -127,7 +137,7 @@ def print_header(opts): return 0 -def loop_body(opts, gstate): +def loop_body(opts, gstate, shutdown=False): if opts.verbose: csv_data = [] else: @@ -175,7 +185,8 @@ def loop_body(opts, gstate): gstate, cb_data_add_item, cb_data_add_sequence, - add_bulk=cb_add_bulk) + add_bulk=cb_add_bulk, + flush_history=shutdown) if opts.verbose: if csv_data: @@ -200,6 +211,7 @@ def main(): sys.exit(rc) gstate = dish_common.GlobalState(target=opts.target) + signal.signal(signal.SIGTERM, handle_sigterm) try: next_loop = time.monotonic() @@ -211,7 +223,10 @@ def main(): time.sleep(next_loop - now) else: break + except Terminated: + pass finally: + loop_body(opts, gstate, shutdown=True) gstate.shutdown() sys.exit(rc)