diff --git a/dish_common.py b/dish_common.py index 3990265..562aa0a 100644 --- a/dish_common.py +++ b/dish_common.py @@ -17,6 +17,8 @@ import logging import re import time +import grpc + import starlink_grpc BRACKETS_RE = re.compile(r"([^[]*)(\[((\d+),|)(\d*)\]|)$") @@ -63,6 +65,13 @@ def create_arg_parser(output_description, bulk_history=True): const=-1, dest="samples", help="Parse all valid samples") + group.add_argument("-o", + "--poll-loops", + type=int, + help="Poll history for N loops or until reboot detected, before computing " + "history stats; this allows for a smaller loop interval with less loss of " + "data when the dish reboots", + metavar="N") if bulk_history: sample_help = ("Number of data samples to parse; normally applies to first loop " "iteration only, default: -1 in bulk mode, loop interval if loop interval " @@ -106,14 +115,22 @@ def run_arg_parser(parser, need_id=False, no_stdout_errors=False): opts = parser.parse_args() + if opts.loop_interval <= 0.0 or opts.poll_loops is None: + opts.poll_loops = 1 + elif opts.poll_loops < 2: + parser.error("Poll loops arg must be 2 or greater to be meaningful") + # for convenience, set flags for whether any mode in a group is selected opts.satus_mode = bool(set(STATUS_MODES).intersection(opts.mode)) opts.history_stats_mode = bool(set(HISTORY_STATS_MODES).intersection(opts.mode)) opts.bulk_mode = "bulk_history" in opts.mode if opts.samples is None: - opts.samples = -1 if opts.bulk_mode else int( - opts.loop_interval) if opts.loop_interval >= 1.0 else SAMPLES_DEFAULT + opts.samples = int(opts.loop_interval * + opts.poll_loops) if opts.loop_interval >= 1.0 else SAMPLES_DEFAULT + opts.bulk_samples = -1 + else: + opts.bulk_samples = opts.samples opts.no_stdout_errors = no_stdout_errors opts.need_id = need_id @@ -141,6 +158,8 @@ class GlobalState: self.timestamp = None self.dish_id = None self.context = starlink_grpc.ChannelContext(target=target) + self.poll_count = 0 + self.prev_history = None def shutdown(self): self.context.close() @@ -212,31 +231,19 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None): if opts.verbose: print("Using dish ID: " + gstate.dish_id) - if opts.history_stats_mode: - start = gstate.counter_stats - parse_samples = opts.samples if start is None else -1 - try: - groups = starlink_grpc.history_stats(parse_samples, - start=start, - verbose=opts.verbose, - context=gstate.context) - general, ping, runlen, latency, loaded, usage = groups[0:6] - except starlink_grpc.GrpcError as e: - conn_error(opts, "Failure getting ping stats: %s", str(e)) + if not opts.history_stats_mode and not (opts.bulk_mode and add_bulk): + return 0 + + try: + history = starlink_grpc.get_history(context=gstate.context) + except grpc.RpcError as e: + conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e))) + history = gstate.prev_history + if history is None: return 1 - add_data(general, "ping_stats") - if "ping_drop" in opts.mode: - add_data(ping, "ping_stats") - if "ping_run_length" in opts.mode: - add_data(runlen, "ping_stats") - if "ping_latency" in opts.mode: - add_data(latency, "ping_stats") - if "ping_loaded_latency" in opts.mode: - add_data(loaded, "ping_stats") - if "usage" in opts.mode: - add_data(usage, "usage") - if not opts.no_counter: - gstate.counter_stats = general["end_counter"] + + if opts.history_stats_mode: + get_history_stats(opts, gstate, add_data, history) if opts.bulk_mode and add_bulk: return get_bulk_data(opts, gstate, add_bulk) @@ -244,16 +251,53 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None): return 0 -def get_bulk_data(opts, gstate, add_bulk): - """Fetch bulk data. See `get_data` for details. +def get_history_stats(opts, gstate, add_data, history): + """Fetch history stats. See `get_data` for details.""" + if history and gstate.prev_history and history.current < gstate.prev_history.current: + if opts.verbose: + print("Dish reboot detected. Restarting loop polling count.") + # process saved history data and keep the new data for next time + history, gstate.prev_history = gstate.prev_history, history + # the newly saved data counts as a loop, so advance 1 past reset point + gstate.poll_count = opts.poll_loops - 2 + elif gstate.poll_count > 0: + gstate.poll_count -= 1 + gstate.prev_history = history + return + else: + # if no --poll-loops option set, opts.poll_loops gets set to 1, so + # poll_count will always be 0 and prev_history will always be None + gstate.prev_history = None + gstate.poll_count = opts.poll_loops - 1 - This was split out in case bulk data needs to be handled separately, for - example, if dish_id needs to be known before calling. - """ + start = gstate.counter_stats + parse_samples = opts.samples if start is None else -1 + groups = starlink_grpc.history_stats(parse_samples, + start=start, + verbose=opts.verbose, + history=history) + general, ping, runlen, latency, loaded, usage = groups[0:6] + add_data(general, "ping_stats") + if "ping_drop" in opts.mode: + add_data(ping, "ping_stats") + if "ping_run_length" in opts.mode: + add_data(runlen, "ping_stats") + if "ping_latency" in opts.mode: + add_data(latency, "ping_stats") + if "ping_loaded_latency" in opts.mode: + add_data(loaded, "ping_stats") + if "usage" in opts.mode: + add_data(usage, "usage") + if not opts.no_counter: + gstate.counter_stats = general["end_counter"] + + +def get_bulk_data(opts, gstate, add_bulk): + """Fetch bulk data. See `get_data` for details.""" before = time.time() start = gstate.counter - parse_samples = opts.samples if start is None else -1 + parse_samples = opts.bulk_samples if start is None else -1 try: general, bulk = starlink_grpc.history_bulk_data(parse_samples, start=start, @@ -288,3 +332,4 @@ def get_bulk_data(opts, gstate, add_bulk): gstate.counter = new_counter gstate.timestamp = timestamp + parsed_samples + return 0 diff --git a/starlink_grpc.py b/starlink_grpc.py index ffa5e65..c0b906f 100644 --- a/starlink_grpc.py +++ b/starlink_grpc.py @@ -797,7 +797,7 @@ def _compute_sample_range(history, parse_samples, start=None, verbose=False): return sample_range, current - start, current -def history_bulk_data(parse_samples, start=None, verbose=False, context=None): +def history_bulk_data(parse_samples, start=None, verbose=False, context=None, history=None): """Fetch history data for a range of samples. Args: @@ -818,6 +818,8 @@ def history_bulk_data(parse_samples, start=None, verbose=False, context=None): verbose (bool): Optionally produce verbose output. context (ChannelContext): Optionally provide a channel for reuse across repeated calls. + history: Optionally provide the history data to use instead of fetching + it, from a prior call to `get_history`. Returns: A tuple with 2 dicts, the first mapping general data names to their @@ -832,10 +834,11 @@ def history_bulk_data(parse_samples, start=None, verbose=False, context=None): GrpcError: Failed getting history info from the Starlink user terminal. """ - try: - history = get_history(context) - except grpc.RpcError as e: - raise GrpcError(e) + if history is None: + try: + history = get_history(context) + except grpc.RpcError as e: + raise GrpcError(e) sample_range, parsed_samples, current = _compute_sample_range(history, parse_samples, @@ -879,7 +882,7 @@ def history_ping_stats(parse_samples, verbose=False, context=None): return history_stats(parse_samples, verbose=verbose, context=context)[0:3] -def history_stats(parse_samples, start=None, verbose=False, context=None): +def history_stats(parse_samples, start=None, verbose=False, context=None, history=None): """Fetch, parse, and compute the packet loss stats. Note: @@ -891,6 +894,8 @@ def history_stats(parse_samples, start=None, verbose=False, context=None): verbose (bool): Optionally produce verbose output. context (ChannelContext): Optionally provide a channel for reuse across repeated calls. + history: Optionally provide the history data to use instead of fetching + it, from a prior call to `get_history`. Returns: A tuple with 6 dicts, mapping general data names, ping drop stat @@ -907,10 +912,11 @@ def history_stats(parse_samples, start=None, verbose=False, context=None): GrpcError: Failed getting history info from the Starlink user terminal. """ - try: - history = get_history(context) - except grpc.RpcError as e: - raise GrpcError(e) + if history is None: + try: + history = get_history(context) + except grpc.RpcError as e: + raise GrpcError(e) sample_range, parse_samples, current = _compute_sample_range(history, parse_samples,