From 258a33d62da8ce3f1894552f11756b6c0082dee0 Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Sun, 21 Feb 2021 13:49:45 -0800 Subject: [PATCH] Port grpc history features to JSON parser script This brings most of the history-related functionality implemented in the grpc scripts to the JSON version, but only for text output. It also renames parserJsonHistory.py to dish_json_text.py, which removes the last remaining complaint from pylint about module name not conforming to style conventions. A lot of this is just duplicated code from dish_common and dish_grpc_text, just simplified a little where some of the flexibility wasn't needed. This removes compatibility with Python 2.7, because I didn't feel like reimplementing statistics.pstdev and didn't think such compatibility was particularly important. --- README.md | 14 +- dish_json_text.py | 285 ++++++++++++++++++++++++++++++++++++++ parseJsonHistory.py | 120 ---------------- starlink_json.py | 323 +++++++++++++++++++++++++++++++++++++------- 4 files changed, 566 insertions(+), 176 deletions(-) create mode 100644 dish_json_text.py delete mode 100644 parseJsonHistory.py diff --git a/README.md b/README.md index cf64cc2..fca5013 100644 --- a/README.md +++ b/README.md @@ -5,9 +5,9 @@ For more information on what Starlink is, see [starlink.com](https://www.starlin ## Prerequisites -Most of the scripts here are [Python](https://www.python.org/) scripts. To use them, you will either need Python installed on your system or you can use the Docker image. If you use the Docker image, you can skip the rest of the prerequisites other than making sure the dish IP is reachable and Docker itself. For Linux systems, the python package from your distribution should be fine, as long as it is Python 3. The JSON script should actually work with Python 2.7, but the grpc scripts all require Python 3 (and Python 2.7 is past end-of-life, so is not recommended anyway). +Most of the scripts here are [Python](https://www.python.org/) scripts. To use them, you will either need Python installed on your system or you can use the Docker image. If you use the Docker image, you can skip the rest of the prerequisites other than making sure the dish IP is reachable and Docker itself. For Linux systems, the python package from your distribution should be fine, as long as it is Python 3. -All the tools that pull data from the dish expect to be able to reach it at the dish's fixed IP address of 192.168.100.1, as do the Starlink [Android app](https://play.google.com/store/apps/details?id=com.starlink.mobile), [iOS app](https://apps.apple.com/us/app/starlink/id1537177988), and the browser app you can run directly from http://192.168.100.1. When using a router other than the one included with the Starlink installation kit, this usually requires some additional router configuration to make it work. That configuration is beyond the scope of this document, but if the Starlink app doesn't work on your home network, then neither will these scripts. That being said, you do not need the Starlink app installed to make use of these scripts. +All the tools that pull data from the dish expect to be able to reach it at the dish's fixed IP address of 192.168.100.1, as do the Starlink [Android app](https://play.google.com/store/apps/details?id=com.starlink.mobile), [iOS app](https://apps.apple.com/us/app/starlink/id1537177988), and the browser app you can run directly from http://192.168.100.1. When using a router other than the one included with the Starlink installation kit, this usually requires some additional router configuration to make it work. That configuration is beyond the scope of this document, but if the Starlink app doesn't work on your home network, then neither will these scripts. That being said, you do not need the Starlink app installed to make use of these scripts. See [here](https://github.com/starlink-community/knowledge-base/wiki#using-your-own-router) for more detail on this. Running the scripts within a [Docker](https://www.docker.com/) container requires Docker to be installed. Information about how to install that can be found at https://docs.docker.com/engine/install/ @@ -83,17 +83,15 @@ Some of the scripts (currently only the InfluxDB one) also support specifying op ### The JSON parser script -`parseJsonHistory.py` takes input from a file and writes its output to standard output. The easiest way to use it is to pipe the `grpcurl` command directly into it. For example: +`dish_json_text.py` is similar to `dish_grpc_text.py`, but it takes JSON format input from a file instead of pulling it directly from the dish via grpc call. It also does not support the status info modes, because those are easy enough to interpret directly from the JSON data. The easiest way to use it is to pipe the `grpcurl` command directly into it. For example: ```shell script -grpcurl -plaintext -d {\"get_history\":{}} 192.168.100.1:9200 SpaceX.API.Device.Device/Handle | python parseJsonHistory.py +grpcurl -plaintext -d {\"get_history\":{}} 192.168.100.1:9200 SpaceX.API.Device.Device/Handle | python3 dish_json_text.py ping_drop ``` For more usage options, run: ```shell script -python parseJsonHistory.py -h +python3 dish_json_text.py -h ``` -When used as-is, `parseJsonHistory.py` will summarize packet loss information from the data the dish records. There's other bits of data in there, though, so that script (or more likely the parsing logic it uses, which now resides in `starlink_json.py`) could be used as a starting point or example of how to iterate through it. - The one bit of functionality this script has over the grpc scripts is that it supports capturing the grpcurl output to a file and reading from that, which may be useful if you're collecting data in one place but analyzing it in another. Otherwise, it's probably better to use `dish_grpc_text.py`, described above. ### Other scripts @@ -139,8 +137,6 @@ There are `reboot` and `dish_stow` requests in the Device protocol, too, so it s Proper Python packaging, since the dependency list keeps growing.... -Some of the functionality implemented in the `starlink-grpc` module could be ported into `starlink-json` easily enough, but this won't be a priority unless someone asks for it. - ## Other Tidbits The Starlink Android app actually uses port 9201 instead of 9200. Both appear to expose the same gRPC service, but the one on port 9201 uses [gRPC-Web](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md), which can use HTTP/1.1, whereas the one on port 9200 uses HTTP/2, which is what most gRPC tools expect. diff --git a/dish_json_text.py b/dish_json_text.py new file mode 100644 index 0000000..6525bf8 --- /dev/null +++ b/dish_json_text.py @@ -0,0 +1,285 @@ +#!/usr/bin/python3 +r"""Output Starlink user terminal data info in text format. + +Expects input as from the following command: + + grpcurl -plaintext -d {\"get_history\":{}} 192.168.100.1:9200 SpaceX.API.Device.Device/Handle + +This script examines the most recent samples from the history data and +prints several different metrics computed from them to stdout. By default, +it will print the results in CSV format. +""" + +import argparse +from datetime import datetime +from datetime import timezone +import logging +import re +import sys +import time + +import starlink_json + +BRACKETS_RE = re.compile(r"([^[]*)(\[((\d+),|)(\d*)\]|)$") +SAMPLES_DEFAULT = 3600 +HISTORY_STATS_MODES = [ + "ping_drop", "ping_run_length", "ping_latency", "ping_loaded_latency", "usage" +] +VERBOSE_FIELD_MAP = { + # ping_drop fields + "samples": "Parsed samples", + "end_counter": "Sample counter", + "total_ping_drop": "Total ping drop", + "count_full_ping_drop": "Count of drop == 1", + "count_obstructed": "Obstructed", + "total_obstructed_ping_drop": "Obstructed ping drop", + "count_full_obstructed_ping_drop": "Obstructed drop == 1", + "count_unscheduled": "Unscheduled", + "total_unscheduled_ping_drop": "Unscheduled ping drop", + "count_full_unscheduled_ping_drop": "Unscheduled drop == 1", + + # ping_run_length fields + "init_run_fragment": "Initial drop run fragment", + "final_run_fragment": "Final drop run fragment", + "run_seconds": "Per-second drop runs", + "run_minutes": "Per-minute drop runs", + + # ping_latency fields + "mean_all_ping_latency": "Mean RTT, drop < 1", + "deciles_all_ping_latency": "RTT deciles, drop < 1", + "mean_full_ping_latency": "Mean RTT, drop == 0", + "deciles_full_ping_latency": "RTT deciles, drop == 0", + "stdev_full_ping_latency": "RTT standard deviation, drop == 0", + + # ping_loaded_latency is still experimental, so leave those unexplained + + # usage fields + "download_usage": "Bytes downloaded", + "upload_usage": "Bytes uploaded", +} + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Collect status and/or history data from a Starlink user terminal and " + "print it to standard output in text format; by default, will print in CSV format", + add_help=False) + + group = parser.add_argument_group(title="General options") + group.add_argument("-f", "--filename", default="-", help="The file to parse, default: stdin") + group.add_argument("-h", "--help", action="help", help="Be helpful") + group.add_argument("-t", + "--timestamp", + help="UTC time history data was pulled, as YYYY-MM-DD_HH:MM:SS or as " + "seconds since Unix epoch, default: current time") + group.add_argument("-v", "--verbose", action="store_true", help="Be verbose") + + group = parser.add_argument_group(title="History mode options") + group.add_argument("-a", + "--all-samples", + action="store_const", + const=-1, + dest="samples", + help="Parse all valid samples") + group.add_argument("-s", + "--samples", + type=int, + help="Number of data samples to parse, default: all in bulk mode, " + "else " + str(SAMPLES_DEFAULT)) + + group = parser.add_argument_group(title="CSV output options") + group.add_argument("-H", + "--print-header", + action="store_true", + help="Print CSV header instead of parsing data") + + all_modes = HISTORY_STATS_MODES + ["bulk_history"] + parser.add_argument("mode", + nargs="+", + choices=all_modes, + help="The data group to record, one or more of: " + ", ".join(all_modes), + metavar="mode") + + opts = parser.parse_args() + + # for convenience, set flags for whether any mode in a group is selected + opts.history_stats_mode = bool(set(HISTORY_STATS_MODES).intersection(opts.mode)) + opts.bulk_mode = "bulk_history" in opts.mode + + if opts.history_stats_mode and opts.bulk_mode: + parser.error("bulk_history cannot be combined with other modes for CSV output") + + if opts.samples is None: + opts.samples = -1 if opts.bulk_mode else SAMPLES_DEFAULT + + if opts.timestamp is None: + opts.history_time = None + else: + try: + opts.history_time = int(opts.timestamp) + except ValueError: + try: + opts.history_time = int( + datetime.strptime(opts.timestamp, "%Y-%m-%d_%H:%M:%S").timestamp()) + except ValueError: + parser.error("Could not parse timestamp") + if opts.verbose: + print("Using timestamp", datetime.fromtimestamp(opts.history_time, tz=timezone.utc)) + + return opts + + +def print_header(opts): + header = ["datetimestamp_utc"] + + def header_add(names): + for name in names: + name, start, end = BRACKETS_RE.match(name).group(1, 4, 5) + if start: + header.extend(name + "_" + str(x) for x in range(int(start), int(end))) + elif end: + header.extend(name + "_" + str(x) for x in range(int(end))) + else: + header.append(name) + + if opts.bulk_mode: + general, bulk = starlink_json.history_bulk_field_names() + header_add(general) + header_add(bulk) + + if opts.history_stats_mode: + groups = starlink_json.history_stats_field_names() + general, ping, runlen, latency, loaded, usage = groups[0:6] + header_add(general) + if "ping_drop" in opts.mode: + header_add(ping) + if "ping_run_length" in opts.mode: + header_add(runlen) + if "ping_loaded_latency" in opts.mode: + header_add(loaded) + if "ping_latency" in opts.mode: + header_add(latency) + if "usage" in opts.mode: + header_add(usage) + + print(",".join(header)) + return 0 + + +def get_data(opts, add_item, add_sequence, add_bulk): + def add_data(data): + for key, val in data.items(): + name, seq = BRACKETS_RE.match(key).group(1, 5) + if seq is None: + add_item(name, val) + else: + add_sequence(name, val) + + if opts.history_stats_mode: + try: + groups = starlink_json.history_stats(opts.filename, opts.samples, verbose=opts.verbose) + except starlink_json.JsonError as e: + logging.error("Failure getting history stats: %s", str(e)) + return 1 + general, ping, runlen, latency, loaded, usage = groups[0:6] + add_data(general) + if "ping_drop" in opts.mode: + add_data(ping) + if "ping_run_length" in opts.mode: + add_data(runlen) + if "ping_latency" in opts.mode: + add_data(latency) + if "ping_loaded_latency" in opts.mode: + add_data(loaded) + if "usage" in opts.mode: + add_data(usage) + + if opts.bulk_mode and add_bulk: + timestamp = int(time.time()) if opts.history_time is None else opts.history_time + try: + general, bulk = starlink_json.history_bulk_data(opts.filename, + opts.samples, + verbose=opts.verbose) + except starlink_json.JsonError as e: + logging.error("Failure getting bulk history: %s", str(e)) + return 1 + parsed_samples = general["samples"] + new_counter = general["end_counter"] + if opts.verbose: + print("Establishing time base: {0} -> {1}".format( + new_counter, datetime.fromtimestamp(timestamp, tz=timezone.utc))) + timestamp -= parsed_samples + + add_bulk(bulk, parsed_samples, timestamp, new_counter - parsed_samples) + + return 0 + + +def loop_body(opts): + if opts.verbose: + csv_data = [] + else: + history_time = int(time.time()) if opts.history_time is None else opts.history_time + csv_data = [datetime.utcfromtimestamp(history_time).isoformat()] + + def cb_data_add_item(name, val): + if opts.verbose: + csv_data.append("{0:22} {1}".format(VERBOSE_FIELD_MAP.get(name, name) + ":", val)) + else: + # special case for get_status failure: this will be the lone item added + if name == "state" and val == "DISH_UNREACHABLE": + csv_data.extend(["", "", "", val]) + else: + csv_data.append(str(val)) + + def cb_data_add_sequence(name, val): + if opts.verbose: + csv_data.append("{0:22} {1}".format( + VERBOSE_FIELD_MAP.get(name, name) + ":", ", ".join(str(subval) for subval in val))) + else: + csv_data.extend(str(subval) for subval in val) + + def cb_add_bulk(bulk, count, timestamp, counter): + if opts.verbose: + print("Time range (UTC): {0} -> {1}".format( + datetime.utcfromtimestamp(timestamp).isoformat(), + datetime.utcfromtimestamp(timestamp + count).isoformat())) + for key, val in bulk.items(): + print("{0:22} {1}".format(key + ":", ", ".join(str(subval) for subval in val))) + else: + for i in range(count): + timestamp += 1 + fields = [datetime.utcfromtimestamp(timestamp).isoformat()] + fields.extend(["" if val[i] is None else str(val[i]) for val in bulk.values()]) + print(",".join(fields)) + + rc = get_data(opts, cb_data_add_item, cb_data_add_sequence, cb_add_bulk) + + if opts.verbose: + if csv_data: + print("\n".join(csv_data)) + else: + # skip if only timestamp + if len(csv_data) > 1: + print(",".join(csv_data)) + + return rc + + +def main(): + opts = parse_args() + + logging.basicConfig(format="%(levelname)s: %(message)s") + + if opts.print_header: + rc = print_header(opts) + sys.exit(rc) + + # for consistency with dish_grpc_text, pretend there was a loop + rc = loop_body(opts) + + sys.exit(rc) + + +if __name__ == '__main__': + main() diff --git a/parseJsonHistory.py b/parseJsonHistory.py deleted file mode 100644 index e12d676..0000000 --- a/parseJsonHistory.py +++ /dev/null @@ -1,120 +0,0 @@ -#!/usr/bin/python -###################################################################### -# -# Example parser for the JSON format history stats output of grpcurl -# for the gRPC service provided on a Starlink user terminal. -# -# Expects input as from the following command: -# grpcurl -plaintext -d {\"get_history\":{}} 192.168.100.1:9200 SpaceX.API.Device.Device/Handle -# -# This script examines the most recent samples from the history data -# and computes several different metrics related to packet loss. By -# default, it will print the results in CSV format. -# -###################################################################### - -import datetime -import sys -import getopt -import logging - -import starlink_json - -arg_error = False - -try: - opts, args = getopt.getopt(sys.argv[1:], "ahrs:vH") -except getopt.GetoptError as err: - print(str(err)) - arg_error = True - -# Default to 1 hour worth of data samples. -samples_default = 3600 -samples = samples_default -print_usage = False -verbose = False -print_header = False -run_lengths = False - -if not arg_error: - if len(args) > 1: - arg_error = True - else: - for opt, arg in opts: - if opt == "-a": - samples = -1 - elif opt == "-h": - print_usage = True - elif opt == "-r": - run_lengths = True - elif opt == "-s": - samples = int(arg) - elif opt == "-v": - verbose = True - elif opt == "-H": - print_header = True - -if print_usage or arg_error: - print("Usage: " + sys.argv[0] + " [options...] []") - print(" where is the file to parse, default: stdin") - print("Options:") - print(" -a: Parse all valid samples") - print(" -h: Be helpful") - print(" -r: Include ping drop run length stats") - print(" -s : Number of data samples to parse, default: " + str(samples_default)) - print(" -v: Be verbose") - print(" -H: print CSV header instead of parsing file") - sys.exit(1 if arg_error else 0) - -logging.basicConfig(format="%(levelname)s: %(message)s") - -g_fields, pd_fields, rl_fields = starlink_json.history_ping_field_names() - -if print_header: - header = ["datetimestamp_utc"] - header.extend(g_fields) - header.extend(pd_fields) - if run_lengths: - for field in rl_fields: - if field.startswith("run_"): - header.extend(field + "_" + str(x) for x in range(1, 61)) - else: - header.append(field) - print(",".join(header)) - sys.exit(0) - -timestamp = datetime.datetime.utcnow() - -try: - g_stats, pd_stats, rl_stats = starlink_json.history_ping_stats(args[0] if args else "-", - samples, verbose) -except starlink_json.JsonError as e: - logging.error("Failure getting ping stats: %s", str(e)) - sys.exit(1) - -if verbose: - print("Parsed samples: " + str(g_stats["samples"])) - print("Total ping drop: " + str(pd_stats["total_ping_drop"])) - print("Count of drop == 1: " + str(pd_stats["count_full_ping_drop"])) - print("Obstructed: " + str(pd_stats["count_obstructed"])) - print("Obstructed ping drop: " + str(pd_stats["total_obstructed_ping_drop"])) - print("Obstructed drop == 1: " + str(pd_stats["count_full_obstructed_ping_drop"])) - print("Unscheduled: " + str(pd_stats["count_unscheduled"])) - print("Unscheduled ping drop: " + str(pd_stats["total_unscheduled_ping_drop"])) - print("Unscheduled drop == 1: " + str(pd_stats["count_full_unscheduled_ping_drop"])) - if run_lengths: - print("Initial drop run fragment: " + str(rl_stats["init_run_fragment"])) - print("Final drop run fragment: " + str(rl_stats["final_run_fragment"])) - print("Per-second drop runs: " + ", ".join(str(x) for x in rl_stats["run_seconds"])) - print("Per-minute drop runs: " + ", ".join(str(x) for x in rl_stats["run_minutes"])) -else: - csv_data = [timestamp.replace(microsecond=0).isoformat()] - csv_data.extend(str(g_stats[field]) for field in g_fields) - csv_data.extend(str(pd_stats[field]) for field in pd_fields) - if run_lengths: - for field in rl_fields: - if field.startswith("run_"): - csv_data.extend(str(substat) for substat in rl_stats[field]) - else: - csv_data.append(str(rl_stats[field])) - print(",".join(csv_data)) diff --git a/starlink_json.py b/starlink_json.py index 7365430..e740c7d 100644 --- a/starlink_json.py +++ b/starlink_json.py @@ -10,6 +10,8 @@ See the starlink_grpc module docstring for descriptions of the stat elements. """ import json +import math +import statistics import sys from itertools import chain @@ -19,16 +21,54 @@ class JsonError(Exception): """Provides error info when something went wrong with JSON parsing.""" -def history_ping_field_names(): - """Return the field names of the packet loss stats. +def history_bulk_field_names(): + """Return the field names of the bulk history data. + + Note: + See `starlink_grpc` module docs regarding brackets in field names. Returns: - A tuple with 3 lists, the first with general stat names, the second - with ping drop stat names, and the third with ping drop run length - stat names. + A tuple with 2 lists, the first with general data names, the second + with bulk history data names. """ return [ "samples", + "end_counter", + ], [ + "pop_ping_drop_rate[]", + "pop_ping_latency_ms[]", + "downlink_throughput_bps[]", + "uplink_throughput_bps[]", + "snr[]", + "scheduled[]", + "obstructed[]", + ] + + +def history_ping_field_names(): + """Deprecated. Use history_stats_field_names instead.""" + return history_stats_field_names()[0:3] + + +def history_stats_field_names(): + """Return the field names of the packet loss stats. + + Note: + See `starlink_grpc` module docs regarding brackets in field names. + + Returns: + A tuple with 6 lists, with general data names, ping drop stat names, + ping drop run length stat names, ping latency stat names, loaded ping + latency stat names, and bandwidth usage stat names, in that order. + + Note: + Additional lists may be added to this tuple in the future with + additional data groups, so it not recommended for the caller to + assume exactly 6 elements. + """ + return [ + "samples", + "end_counter", ], [ "total_ping_drop", "count_full_ping_drop", @@ -41,8 +81,22 @@ def history_ping_field_names(): ], [ "init_run_fragment", "final_run_fragment", - "run_seconds", - "run_minutes", + "run_seconds[1,61]", + "run_minutes[1,61]", + ], [ + "mean_all_ping_latency", + "deciles_all_ping_latency[11]", + "mean_full_ping_latency", + "deciles_full_ping_latency[11]", + "stdev_full_ping_latency", + ], [ + "load_bucket_samples[15]", + "load_bucket_min_latency[15]", + "load_bucket_median_latency[15]", + "load_bucket_max_latency[15]", + ], [ + "download_usage", + "upload_usage", ] @@ -55,7 +109,7 @@ def get_history(filename): Raises: Various exceptions depending on Python version: Failure to open or - read input or invalid JSON read on input. + read input or invalid JSON read on input. """ if filename == "-": json_data = json.load(sys.stdin) @@ -65,33 +119,7 @@ def get_history(filename): return json_data["dishGetHistory"] -def history_ping_stats(filename, parse_samples, verbose=False): - """Fetch, parse, and compute the packet loss stats. - - Args: - filename (str): Filename from which to read JSON data, or "-" to read - from standard input. - parse_samples (int): Number of samples to process, or -1 to parse all - available samples. - verbose (bool): Optionally produce verbose output. - - Returns: - A tuple with 3 dicts, the first mapping general stat names to their - values, the second mapping ping drop stat names to their values and - the third mapping ping drop run length stat names to their values. - - Raises: - JsonError: Failure to open, read, or parse JSON on input. - """ - try: - history = get_history(filename) - except ValueError as e: - raise JsonError("Failed to parse JSON: " + str(e)) - except Exception as e: - raise JsonError(e) - - # "current" is the count of data samples written to the ring buffer, - # irrespective of buffer wrap. +def _compute_sample_range(history, parse_samples, verbose=False): current = int(history["current"]) samples = len(history["popPingDropRate"]) @@ -104,9 +132,133 @@ def history_ping_stats(filename, parse_samples, verbose=False): if verbose: print("Valid samples: " + str(samples)) + if parse_samples < 0 or samples < parse_samples: + parse_samples = samples + + start = current - parse_samples + + if start == current: + return range(0), 0, current + # This is ring buffer offset, so both index to oldest data sample and # index to next data sample after the newest one. - offset = current % samples + end_offset = current % samples + start_offset = start % samples + + # Set the range for the requested set of samples. This will iterate + # sample index in order from oldest to newest. + if start_offset < end_offset: + sample_range = range(start_offset, end_offset) + else: + sample_range = chain(range(start_offset, samples), range(0, end_offset)) + + return sample_range, current - start, current + + +def history_bulk_data(filename, parse_samples, verbose=False): + """Fetch history data for a range of samples. + + Args: + filename (str): Filename from which to read JSON data, or "-" to read + from standard input. + parse_samples (int): Number of samples to process, or -1 to parse all + available samples. + verbose (bool): Optionally produce verbose output. + + Returns: + A tuple with 2 dicts, the first mapping general data names to their + values and the second mapping bulk history data names to their values. + + Note: The field names in the returned data do _not_ include brackets + to indicate sequences, since those would just need to be parsed + out. The general data is all single items and the bulk history + data is all sequences. + + Raises: + JsonError: Failure to open, read, or parse JSON on input. + """ + try: + history = get_history(filename) + except ValueError as e: + raise JsonError("Failed to parse JSON: " + str(e)) + except Exception as e: + raise JsonError(e) + + sample_range, parsed_samples, current = _compute_sample_range(history, + parse_samples, + verbose=verbose) + + pop_ping_drop_rate = [] + pop_ping_latency_ms = [] + downlink_throughput_bps = [] + uplink_throughput_bps = [] + snr = [] + scheduled = [] + obstructed = [] + + for i in sample_range: + pop_ping_drop_rate.append(history["popPingDropRate"][i]) + pop_ping_latency_ms.append( + history["popPingLatencyMs"][i] if history["popPingDropRate"][i] < 1 else None) + downlink_throughput_bps.append(history["downlinkThroughputBps"][i]) + uplink_throughput_bps.append(history["uplinkThroughputBps"][i]) + snr.append(history["snr"][i]) + scheduled.append(history["scheduled"][i]) + obstructed.append(history["obstructed"][i]) + + return { + "samples": parsed_samples, + "end_counter": current, + }, { + "pop_ping_drop_rate": pop_ping_drop_rate, + "pop_ping_latency_ms": pop_ping_latency_ms, + "downlink_throughput_bps": downlink_throughput_bps, + "uplink_throughput_bps": uplink_throughput_bps, + "snr": snr, + "scheduled": scheduled, + "obstructed": obstructed, + } + + +def history_ping_stats(filename, parse_samples, verbose=False): + """Deprecated. Use history_stats instead.""" + return history_stats(filename, parse_samples, verbose=verbose)[0:3] + + +def history_stats(filename, parse_samples, verbose=False): + """Fetch, parse, and compute ping and usage stats. + + Args: + filename (str): Filename from which to read JSON data, or "-" to read + from standard input. + parse_samples (int): Number of samples to process, or -1 to parse all + available samples. + verbose (bool): Optionally produce verbose output. + + Returns: + A tuple with 6 dicts, mapping general data names, ping drop stat + names, ping drop run length stat names, ping latency stat names, + loaded ping latency stat names, and bandwidth usage stat names to + their respective values, in that order. + + Note: + Additional dicts may be added to this tuple in the future with + additional data groups, so it not recommended for the caller to + assume exactly 6 elements. + + Raises: + JsonError: Failure to open, read, or parse JSON on input. + """ + try: + history = get_history(filename) + except ValueError as e: + raise JsonError("Failed to parse JSON: " + str(e)) + except Exception as e: + raise JsonError(e) + + sample_range, parsed_samples, current = _compute_sample_range(history, + parse_samples, + verbose=verbose) tot = 0.0 count_full_drop = 0 @@ -122,15 +274,12 @@ def history_ping_stats(filename, parse_samples, verbose=False): run_length = 0 init_run_length = None - if parse_samples < 0 or samples < parse_samples: - parse_samples = samples + usage_down = 0.0 + usage_up = 0.0 - # Parse the most recent parse_samples-sized set of samples. This will - # iterate samples in order from oldest to newest. - if parse_samples <= offset: - sample_range = range(offset - parse_samples, offset) - else: - sample_range = chain(range(samples + offset - parse_samples, samples), range(0, offset)) + rtt_full = [] + rtt_all = [] + rtt_buckets = [[] for _ in range(15)] for i in sample_range: d = history["popPingDropRate"][i] @@ -165,6 +314,22 @@ def history_ping_stats(filename, parse_samples, verbose=False): count_full_obstruct += 1 tot += d + down = history["downlinkThroughputBps"][i] + usage_down += down + up = history["uplinkThroughputBps"][i] + usage_up += up + + rtt = history["popPingLatencyMs"][i] + # note that "full" here means the opposite of ping drop full + if d == 0.0: + rtt_full.append(rtt) + if down + up > 500000: + rtt_buckets[min(14, int(math.log2((down+up) / 500000)))].append(rtt) + else: + rtt_buckets[0].append(rtt) + if d < 1.0: + rtt_all.append((rtt, 1.0 - d)) + # If the entire sample set is one big drop run, it will be both initial # fragment (continued from prior sample range) and final one (continued # to next sample range), but to avoid double-reporting, just call it @@ -173,8 +338,58 @@ def history_ping_stats(filename, parse_samples, verbose=False): init_run_length = run_length run_length = 0 + def weighted_mean_and_quantiles(data, n): + if not data: + return None, [None] * (n+1) + total_weight = sum(x[1] for x in data) + result = [] + items = iter(data) + value, accum_weight = next(items) + accum_value = value * accum_weight + for boundary in (total_weight * x / n for x in range(n)): + while accum_weight < boundary: + try: + value, weight = next(items) + accum_value += value * weight + accum_weight += weight + except StopIteration: + # shouldn't happen, but in case of float precision weirdness... + break + result.append(value) + result.append(data[-1][0]) + accum_value += sum(x[0] for x in items) + return accum_value / total_weight, result + + bucket_samples = [] + bucket_min = [] + bucket_median = [] + bucket_max = [] + for bucket in rtt_buckets: + if bucket: + bucket_samples.append(len(bucket)) + bucket_min.append(min(bucket)) + bucket_median.append(statistics.median(bucket)) + bucket_max.append(max(bucket)) + else: + bucket_samples.append(0) + bucket_min.append(None) + bucket_median.append(None) + bucket_max.append(None) + + rtt_all.sort(key=lambda x: x[0]) + wmean_all, wdeciles_all = weighted_mean_and_quantiles(rtt_all, 10) + if len(rtt_full) > 1: + deciles_full = [min(rtt_full)] + deciles_full.extend(statistics.quantiles(rtt_full, n=10, method="inclusive")) + deciles_full.append(max(rtt_full)) + elif rtt_full: + deciles_full = [rtt_full[0]] * 11 + else: + deciles_full = [None] * 11 + return { - "samples": parse_samples, + "samples": parsed_samples, + "end_counter": current, }, { "total_ping_drop": tot, "count_full_ping_drop": count_full_drop, @@ -187,6 +402,20 @@ def history_ping_stats(filename, parse_samples, verbose=False): }, { "init_run_fragment": init_run_length, "final_run_fragment": run_length, - "run_seconds": second_runs, - "run_minutes": minute_runs, + "run_seconds[1,]": second_runs, + "run_minutes[1,]": minute_runs, + }, { + "mean_all_ping_latency": wmean_all, + "deciles_all_ping_latency[]": wdeciles_all, + "mean_full_ping_latency": statistics.fmean(rtt_full) if rtt_full else None, + "deciles_full_ping_latency[]": deciles_full, + "stdev_full_ping_latency": statistics.pstdev(rtt_full) if rtt_full else None, + }, { + "load_bucket_samples[]": bucket_samples, + "load_bucket_min_latency[]": bucket_min, + "load_bucket_median_latency[]": bucket_median, + "load_bucket_max_latency[]": bucket_max, + }, { + "download_usage": int(round(usage_down / 8)), + "upload_usage": int(round(usage_up / 8)), }