Record stats from polled data on script shutdown

If there was history data collected via the --poll-loops option, but not yet used to compute history stats, do so on script shutdown. This should reduce the amount of data lost when the script restarts, for example to reboot the system on which it runs, since --poll-loops allows for collection of history data beyond the 15-minute buffer size the dish (currently) holds.

This will only work if the script is shut down via SIGTERM or SIGINT (for example, by interrupting with Control-C).
This commit is contained in:
sparky8512 2022-01-20 14:07:09 -08:00
parent 7dbb47ab40
commit 8655f75bab
5 changed files with 67 additions and 22 deletions

View file

@ -168,7 +168,7 @@ class GlobalState:
self.context.close() self.context.close()
def get_data(opts, gstate, add_item, add_sequence, add_bulk=None): def get_data(opts, gstate, add_item, add_sequence, add_bulk=None, flush_history=False):
"""Fetch data from the dish, pull it apart and call back with the pieces. """Fetch data from the dish, pull it apart and call back with the pieces.
This function uses call backs to return the useful data. If need_id is set This function uses call backs to return the useful data. If need_id is set
@ -189,16 +189,27 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None):
prototype: prototype:
add_bulk(bulk_data, count, start_timestamp, start_counter) add_bulk(bulk_data, count, start_timestamp, start_counter)
flush_history (bool): Optional. If true, run in a special mode that
emits (only) history stats for already polled data, if any,
regardless of --poll-loops state. Intended for script shutdown
operation, in order to flush stats for polled history data which
would otherwise be lost on script restart.
Returns: Returns:
1 if there were any failures getting data from the dish, otherwise 0. 1 if there were any failures getting data from the dish, otherwise 0.
""" """
if flush_history and opts.poll_loops < 2:
return 0
rc = 0
if not flush_history:
rc = get_status_data(opts, gstate, add_item, add_sequence) rc = get_status_data(opts, gstate, add_item, add_sequence)
if opts.history_stats_mode and not rc: if opts.history_stats_mode and not rc:
rc = get_history_stats(opts, gstate, add_item, add_sequence) rc = get_history_stats(opts, gstate, add_item, add_sequence, flush_history)
if opts.bulk_mode and add_bulk and not rc: if not flush_history and opts.bulk_mode and add_bulk and not rc:
rc = get_bulk_data(opts, gstate, add_bulk) rc = get_bulk_data(opts, gstate, add_bulk)
return rc return rc
@ -263,8 +274,11 @@ def get_status_data(opts, gstate, add_item, add_sequence):
return 0 return 0
def get_history_stats(opts, gstate, add_item, add_sequence): def get_history_stats(opts, gstate, add_item, add_sequence, flush_history):
"""Fetch history stats. See `get_data` for details.""" """Fetch history stats. See `get_data` for details."""
if flush_history:
history = None
else:
try: try:
history = starlink_grpc.get_history(context=gstate.context) history = starlink_grpc.get_history(context=gstate.context)
except grpc.RpcError as e: except grpc.RpcError as e:
@ -293,14 +307,14 @@ def get_history_stats(opts, gstate, add_item, add_sequence):
else: else:
gstate.accum_history = history gstate.accum_history = history
if gstate.poll_count < opts.poll_loops - 1: if gstate.poll_count < opts.poll_loops - 1 and not flush_history:
gstate.poll_count += 1 gstate.poll_count += 1
return 0 return 0
# This can happen if all polling attempts failed. Verbose output has # This can happen if all polling attempts failed. Verbose output has
# already happened, so just return. # already happened, so just return.
if gstate.accum_history is None: if gstate.accum_history is None:
return 1 return 0 if flush_history else 1
gstate.poll_count = 0 gstate.poll_count = 0

View file

@ -221,7 +221,7 @@ def sync_timebase(opts, gstate):
gstate.deferred_points.clear() gstate.deferred_points.clear()
def loop_body(opts, gstate): def loop_body(opts, gstate, shutdown=False):
fields = {"status": {}, "ping_stats": {}, "usage": {}} fields = {"status": {}, "ping_stats": {}, "usage": {}}
def cb_add_item(key, val, category): def cb_add_item(key, val, category):
@ -251,7 +251,12 @@ def loop_body(opts, gstate):
points[-1]["fields"]["counter"] = counter + count points[-1]["fields"]["counter"] = counter + count
now = time.time() now = time.time()
rc = dish_common.get_data(opts, gstate, cb_add_item, cb_add_sequence, add_bulk=cb_add_bulk) rc = dish_common.get_data(opts,
gstate,
cb_add_item,
cb_add_sequence,
add_bulk=cb_add_bulk,
flush_history=shutdown)
if rc: if rc:
return rc return rc
@ -318,6 +323,7 @@ def main():
except Terminated: except Terminated:
pass pass
finally: finally:
loop_body(opts, gstate, shutdown=True)
if gstate.points: if gstate.points:
rc = flush_points(opts, gstate) rc = flush_points(opts, gstate)
gstate.influx_client.close() gstate.influx_client.close()

View file

@ -220,7 +220,7 @@ def sync_timebase(opts, gstate):
gstate.deferred_points.clear() gstate.deferred_points.clear()
def loop_body(opts, gstate): def loop_body(opts, gstate, shutdown=False):
fields = {"status": {}, "ping_stats": {}, "usage": {}} fields = {"status": {}, "ping_stats": {}, "usage": {}}
def cb_add_item(key, val, category): def cb_add_item(key, val, category):
@ -252,7 +252,12 @@ def loop_body(opts, gstate):
now = time.time() now = time.time()
# work with UTC here # work with UTC here
# now = time.mktime(datetime.utcnow().timetuple()) # now = time.mktime(datetime.utcnow().timetuple())
rc = dish_common.get_data(opts, gstate, cb_add_item, cb_add_sequence, add_bulk=cb_add_bulk) rc = dish_common.get_data(opts,
gstate,
cb_add_item,
cb_add_sequence,
add_bulk=cb_add_bulk,
flush_history=shutdown)
if rc: if rc:
return rc return rc
@ -318,6 +323,7 @@ def main():
except Terminated: except Terminated:
pass pass
finally: finally:
loop_body(opts, gstate, shutdown=True)
if gstate.points: if gstate.points:
rc = flush_points(opts, gstate) rc = flush_points(opts, gstate)
gstate.influx_client.close() gstate.influx_client.close()

View file

@ -72,7 +72,7 @@ def parse_args():
group.add_argument("-k", group.add_argument("-k",
"--skip-query", "--skip-query",
action="store_true", action="store_true",
help="Skip querying for prior sample write point in bulk mode") help="Skip querying for prior sample write point in history modes")
opts = dish_common.run_arg_parser(parser, need_id=True) opts = dish_common.run_arg_parser(parser, need_id=True)
@ -99,7 +99,7 @@ def query_counter(opts, gstate, column, table):
return 0, None return 0, None
def loop_body(opts, gstate): def loop_body(opts, gstate, shutdown=False):
tables = {"status": {}, "ping_stats": {}, "usage": {}} tables = {"status": {}, "ping_stats": {}, "usage": {}}
hist_cols = ["time", "id"] hist_cols = ["time", "id"]
hist_rows = [] hist_rows = []
@ -123,14 +123,17 @@ def loop_body(opts, gstate):
hist_rows.append(row) hist_rows.append(row)
now = int(time.time()) now = int(time.time())
rc = 0
if not shutdown:
rc = dish_common.get_status_data(opts, gstate, cb_add_item, cb_add_sequence) rc = dish_common.get_status_data(opts, gstate, cb_add_item, cb_add_sequence)
if opts.history_stats_mode and not rc: if opts.history_stats_mode and not rc:
if gstate.counter_stats is None and not opts.skip_query and opts.samples < 0: if gstate.counter_stats is None and not opts.skip_query and opts.samples < 0:
_, gstate.counter_stats = query_counter(opts, gstate, "end_counter", "ping_stats") _, gstate.counter_stats = query_counter(opts, gstate, "end_counter", "ping_stats")
rc = dish_common.get_history_stats(opts, gstate, cb_add_item, cb_add_sequence) rc = dish_common.get_history_stats(opts, gstate, cb_add_item, cb_add_sequence, shutdown)
if opts.bulk_mode and not rc: if not shutdown and opts.bulk_mode and not rc:
if gstate.counter is None and not opts.skip_query and opts.bulk_samples < 0: if gstate.counter is None and not opts.skip_query and opts.bulk_samples < 0:
gstate.timestamp, gstate.counter = query_counter(opts, gstate, "counter", "history") gstate.timestamp, gstate.counter = query_counter(opts, gstate, "counter", "history")
rc = dish_common.get_bulk_data(opts, gstate, cb_add_bulk) rc = dish_common.get_bulk_data(opts, gstate, cb_add_bulk)
@ -298,6 +301,7 @@ def main():
except Terminated: except Terminated:
pass pass
finally: finally:
loop_body(opts, gstate, shutdown=True)
gstate.sql_conn.close() gstate.sql_conn.close()
gstate.shutdown() gstate.shutdown()

View file

@ -13,6 +13,7 @@ the alert_detail mode, you can use the alerts bitmask in the status group.
from datetime import datetime from datetime import datetime
import logging import logging
import signal
import sys import sys
import time import time
@ -57,6 +58,15 @@ VERBOSE_FIELD_MAP = {
} }
class Terminated(Exception):
pass
def handle_sigterm(signum, frame):
# Turn SIGTERM into an exception so main loop can clean up
raise Terminated
def parse_args(): def parse_args():
parser = dish_common.create_arg_parser( parser = dish_common.create_arg_parser(
output_description= output_description=
@ -127,7 +137,7 @@ def print_header(opts):
return 0 return 0
def loop_body(opts, gstate): def loop_body(opts, gstate, shutdown=False):
if opts.verbose: if opts.verbose:
csv_data = [] csv_data = []
else: else:
@ -175,7 +185,8 @@ def loop_body(opts, gstate):
gstate, gstate,
cb_data_add_item, cb_data_add_item,
cb_data_add_sequence, cb_data_add_sequence,
add_bulk=cb_add_bulk) add_bulk=cb_add_bulk,
flush_history=shutdown)
if opts.verbose: if opts.verbose:
if csv_data: if csv_data:
@ -200,6 +211,7 @@ def main():
sys.exit(rc) sys.exit(rc)
gstate = dish_common.GlobalState(target=opts.target) gstate = dish_common.GlobalState(target=opts.target)
signal.signal(signal.SIGTERM, handle_sigterm)
try: try:
next_loop = time.monotonic() next_loop = time.monotonic()
@ -211,7 +223,10 @@ def main():
time.sleep(next_loop - now) time.sleep(next_loop - now)
else: else:
break break
except Terminated:
pass
finally: finally:
loop_body(opts, gstate, shutdown=True)
gstate.shutdown() gstate.shutdown()
sys.exit(rc) sys.exit(rc)