From 38987054b9a0649259228f7f466d3c6acb9c95e6 Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Fri, 19 Feb 2021 10:56:20 -0800 Subject: [PATCH] Add option to poll history more frequently This further complicates the code, for functionality that probably only I care about, but when computing stats for relatively long time intervals, it really hurts when the dish reboots and up to an entire time period's worth of data is lost at exactly the point where it may have been having interesting behavior. --- dish_common.py | 109 +++++++++++++++++++++++++++++++++-------------- starlink_grpc.py | 26 ++++++----- 2 files changed, 93 insertions(+), 42 deletions(-) diff --git a/dish_common.py b/dish_common.py index 3990265..562aa0a 100644 --- a/dish_common.py +++ b/dish_common.py @@ -17,6 +17,8 @@ import logging import re import time +import grpc + import starlink_grpc BRACKETS_RE = re.compile(r"([^[]*)(\[((\d+),|)(\d*)\]|)$") @@ -63,6 +65,13 @@ def create_arg_parser(output_description, bulk_history=True): const=-1, dest="samples", help="Parse all valid samples") + group.add_argument("-o", + "--poll-loops", + type=int, + help="Poll history for N loops or until reboot detected, before computing " + "history stats; this allows for a smaller loop interval with less loss of " + "data when the dish reboots", + metavar="N") if bulk_history: sample_help = ("Number of data samples to parse; normally applies to first loop " "iteration only, default: -1 in bulk mode, loop interval if loop interval " @@ -106,14 +115,22 @@ def run_arg_parser(parser, need_id=False, no_stdout_errors=False): opts = parser.parse_args() + if opts.loop_interval <= 0.0 or opts.poll_loops is None: + opts.poll_loops = 1 + elif opts.poll_loops < 2: + parser.error("Poll loops arg must be 2 or greater to be meaningful") + # for convenience, set flags for whether any mode in a group is selected opts.satus_mode = bool(set(STATUS_MODES).intersection(opts.mode)) opts.history_stats_mode = bool(set(HISTORY_STATS_MODES).intersection(opts.mode)) opts.bulk_mode = "bulk_history" in opts.mode if opts.samples is None: - opts.samples = -1 if opts.bulk_mode else int( - opts.loop_interval) if opts.loop_interval >= 1.0 else SAMPLES_DEFAULT + opts.samples = int(opts.loop_interval * + opts.poll_loops) if opts.loop_interval >= 1.0 else SAMPLES_DEFAULT + opts.bulk_samples = -1 + else: + opts.bulk_samples = opts.samples opts.no_stdout_errors = no_stdout_errors opts.need_id = need_id @@ -141,6 +158,8 @@ class GlobalState: self.timestamp = None self.dish_id = None self.context = starlink_grpc.ChannelContext(target=target) + self.poll_count = 0 + self.prev_history = None def shutdown(self): self.context.close() @@ -212,31 +231,19 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None): if opts.verbose: print("Using dish ID: " + gstate.dish_id) - if opts.history_stats_mode: - start = gstate.counter_stats - parse_samples = opts.samples if start is None else -1 - try: - groups = starlink_grpc.history_stats(parse_samples, - start=start, - verbose=opts.verbose, - context=gstate.context) - general, ping, runlen, latency, loaded, usage = groups[0:6] - except starlink_grpc.GrpcError as e: - conn_error(opts, "Failure getting ping stats: %s", str(e)) + if not opts.history_stats_mode and not (opts.bulk_mode and add_bulk): + return 0 + + try: + history = starlink_grpc.get_history(context=gstate.context) + except grpc.RpcError as e: + conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e))) + history = gstate.prev_history + if history is None: return 1 - add_data(general, "ping_stats") - if "ping_drop" in opts.mode: - add_data(ping, "ping_stats") - if "ping_run_length" in opts.mode: - add_data(runlen, "ping_stats") - if "ping_latency" in opts.mode: - add_data(latency, "ping_stats") - if "ping_loaded_latency" in opts.mode: - add_data(loaded, "ping_stats") - if "usage" in opts.mode: - add_data(usage, "usage") - if not opts.no_counter: - gstate.counter_stats = general["end_counter"] + + if opts.history_stats_mode: + get_history_stats(opts, gstate, add_data, history) if opts.bulk_mode and add_bulk: return get_bulk_data(opts, gstate, add_bulk) @@ -244,16 +251,53 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None): return 0 -def get_bulk_data(opts, gstate, add_bulk): - """Fetch bulk data. See `get_data` for details. +def get_history_stats(opts, gstate, add_data, history): + """Fetch history stats. See `get_data` for details.""" + if history and gstate.prev_history and history.current < gstate.prev_history.current: + if opts.verbose: + print("Dish reboot detected. Restarting loop polling count.") + # process saved history data and keep the new data for next time + history, gstate.prev_history = gstate.prev_history, history + # the newly saved data counts as a loop, so advance 1 past reset point + gstate.poll_count = opts.poll_loops - 2 + elif gstate.poll_count > 0: + gstate.poll_count -= 1 + gstate.prev_history = history + return + else: + # if no --poll-loops option set, opts.poll_loops gets set to 1, so + # poll_count will always be 0 and prev_history will always be None + gstate.prev_history = None + gstate.poll_count = opts.poll_loops - 1 - This was split out in case bulk data needs to be handled separately, for - example, if dish_id needs to be known before calling. - """ + start = gstate.counter_stats + parse_samples = opts.samples if start is None else -1 + groups = starlink_grpc.history_stats(parse_samples, + start=start, + verbose=opts.verbose, + history=history) + general, ping, runlen, latency, loaded, usage = groups[0:6] + add_data(general, "ping_stats") + if "ping_drop" in opts.mode: + add_data(ping, "ping_stats") + if "ping_run_length" in opts.mode: + add_data(runlen, "ping_stats") + if "ping_latency" in opts.mode: + add_data(latency, "ping_stats") + if "ping_loaded_latency" in opts.mode: + add_data(loaded, "ping_stats") + if "usage" in opts.mode: + add_data(usage, "usage") + if not opts.no_counter: + gstate.counter_stats = general["end_counter"] + + +def get_bulk_data(opts, gstate, add_bulk): + """Fetch bulk data. See `get_data` for details.""" before = time.time() start = gstate.counter - parse_samples = opts.samples if start is None else -1 + parse_samples = opts.bulk_samples if start is None else -1 try: general, bulk = starlink_grpc.history_bulk_data(parse_samples, start=start, @@ -288,3 +332,4 @@ def get_bulk_data(opts, gstate, add_bulk): gstate.counter = new_counter gstate.timestamp = timestamp + parsed_samples + return 0 diff --git a/starlink_grpc.py b/starlink_grpc.py index ffa5e65..c0b906f 100644 --- a/starlink_grpc.py +++ b/starlink_grpc.py @@ -797,7 +797,7 @@ def _compute_sample_range(history, parse_samples, start=None, verbose=False): return sample_range, current - start, current -def history_bulk_data(parse_samples, start=None, verbose=False, context=None): +def history_bulk_data(parse_samples, start=None, verbose=False, context=None, history=None): """Fetch history data for a range of samples. Args: @@ -818,6 +818,8 @@ def history_bulk_data(parse_samples, start=None, verbose=False, context=None): verbose (bool): Optionally produce verbose output. context (ChannelContext): Optionally provide a channel for reuse across repeated calls. + history: Optionally provide the history data to use instead of fetching + it, from a prior call to `get_history`. Returns: A tuple with 2 dicts, the first mapping general data names to their @@ -832,10 +834,11 @@ def history_bulk_data(parse_samples, start=None, verbose=False, context=None): GrpcError: Failed getting history info from the Starlink user terminal. """ - try: - history = get_history(context) - except grpc.RpcError as e: - raise GrpcError(e) + if history is None: + try: + history = get_history(context) + except grpc.RpcError as e: + raise GrpcError(e) sample_range, parsed_samples, current = _compute_sample_range(history, parse_samples, @@ -879,7 +882,7 @@ def history_ping_stats(parse_samples, verbose=False, context=None): return history_stats(parse_samples, verbose=verbose, context=context)[0:3] -def history_stats(parse_samples, start=None, verbose=False, context=None): +def history_stats(parse_samples, start=None, verbose=False, context=None, history=None): """Fetch, parse, and compute the packet loss stats. Note: @@ -891,6 +894,8 @@ def history_stats(parse_samples, start=None, verbose=False, context=None): verbose (bool): Optionally produce verbose output. context (ChannelContext): Optionally provide a channel for reuse across repeated calls. + history: Optionally provide the history data to use instead of fetching + it, from a prior call to `get_history`. Returns: A tuple with 6 dicts, mapping general data names, ping drop stat @@ -907,10 +912,11 @@ def history_stats(parse_samples, start=None, verbose=False, context=None): GrpcError: Failed getting history info from the Starlink user terminal. """ - try: - history = get_history(context) - except grpc.RpcError as e: - raise GrpcError(e) + if history is None: + try: + history = get_history(context) + except grpc.RpcError as e: + raise GrpcError(e) sample_range, parse_samples, current = _compute_sample_range(history, parse_samples,