Port grpc history features to JSON parser script

This brings most of the history-related functionality implemented in the grpc scripts to the JSON version, but only for text output. It also renames parserJsonHistory.py to dish_json_text.py, which removes the last remaining complaint from pylint about module name not conforming to style conventions.

A lot of this is just duplicated code from dish_common and dish_grpc_text, just simplified a little where some of the flexibility wasn't needed.

This removes compatibility with Python 2.7, because I didn't feel like reimplementing statistics.pstdev and didn't think such compatibility was particularly important.
This commit is contained in:
sparky8512 2021-02-21 13:49:45 -08:00
parent 38987054b9
commit 258a33d62d
4 changed files with 566 additions and 176 deletions

View file

@ -5,9 +5,9 @@ For more information on what Starlink is, see [starlink.com](https://www.starlin
## Prerequisites ## Prerequisites
Most of the scripts here are [Python](https://www.python.org/) scripts. To use them, you will either need Python installed on your system or you can use the Docker image. If you use the Docker image, you can skip the rest of the prerequisites other than making sure the dish IP is reachable and Docker itself. For Linux systems, the python package from your distribution should be fine, as long as it is Python 3. The JSON script should actually work with Python 2.7, but the grpc scripts all require Python 3 (and Python 2.7 is past end-of-life, so is not recommended anyway). Most of the scripts here are [Python](https://www.python.org/) scripts. To use them, you will either need Python installed on your system or you can use the Docker image. If you use the Docker image, you can skip the rest of the prerequisites other than making sure the dish IP is reachable and Docker itself. For Linux systems, the python package from your distribution should be fine, as long as it is Python 3.
All the tools that pull data from the dish expect to be able to reach it at the dish's fixed IP address of 192.168.100.1, as do the Starlink [Android app](https://play.google.com/store/apps/details?id=com.starlink.mobile), [iOS app](https://apps.apple.com/us/app/starlink/id1537177988), and the browser app you can run directly from http://192.168.100.1. When using a router other than the one included with the Starlink installation kit, this usually requires some additional router configuration to make it work. That configuration is beyond the scope of this document, but if the Starlink app doesn't work on your home network, then neither will these scripts. That being said, you do not need the Starlink app installed to make use of these scripts. All the tools that pull data from the dish expect to be able to reach it at the dish's fixed IP address of 192.168.100.1, as do the Starlink [Android app](https://play.google.com/store/apps/details?id=com.starlink.mobile), [iOS app](https://apps.apple.com/us/app/starlink/id1537177988), and the browser app you can run directly from http://192.168.100.1. When using a router other than the one included with the Starlink installation kit, this usually requires some additional router configuration to make it work. That configuration is beyond the scope of this document, but if the Starlink app doesn't work on your home network, then neither will these scripts. That being said, you do not need the Starlink app installed to make use of these scripts. See [here](https://github.com/starlink-community/knowledge-base/wiki#using-your-own-router) for more detail on this.
Running the scripts within a [Docker](https://www.docker.com/) container requires Docker to be installed. Information about how to install that can be found at https://docs.docker.com/engine/install/ Running the scripts within a [Docker](https://www.docker.com/) container requires Docker to be installed. Information about how to install that can be found at https://docs.docker.com/engine/install/
@ -83,17 +83,15 @@ Some of the scripts (currently only the InfluxDB one) also support specifying op
### The JSON parser script ### The JSON parser script
`parseJsonHistory.py` takes input from a file and writes its output to standard output. The easiest way to use it is to pipe the `grpcurl` command directly into it. For example: `dish_json_text.py` is similar to `dish_grpc_text.py`, but it takes JSON format input from a file instead of pulling it directly from the dish via grpc call. It also does not support the status info modes, because those are easy enough to interpret directly from the JSON data. The easiest way to use it is to pipe the `grpcurl` command directly into it. For example:
```shell script ```shell script
grpcurl -plaintext -d {\"get_history\":{}} 192.168.100.1:9200 SpaceX.API.Device.Device/Handle | python parseJsonHistory.py grpcurl -plaintext -d {\"get_history\":{}} 192.168.100.1:9200 SpaceX.API.Device.Device/Handle | python3 dish_json_text.py ping_drop
``` ```
For more usage options, run: For more usage options, run:
```shell script ```shell script
python parseJsonHistory.py -h python3 dish_json_text.py -h
``` ```
When used as-is, `parseJsonHistory.py` will summarize packet loss information from the data the dish records. There's other bits of data in there, though, so that script (or more likely the parsing logic it uses, which now resides in `starlink_json.py`) could be used as a starting point or example of how to iterate through it.
The one bit of functionality this script has over the grpc scripts is that it supports capturing the grpcurl output to a file and reading from that, which may be useful if you're collecting data in one place but analyzing it in another. Otherwise, it's probably better to use `dish_grpc_text.py`, described above. The one bit of functionality this script has over the grpc scripts is that it supports capturing the grpcurl output to a file and reading from that, which may be useful if you're collecting data in one place but analyzing it in another. Otherwise, it's probably better to use `dish_grpc_text.py`, described above.
### Other scripts ### Other scripts
@ -139,8 +137,6 @@ There are `reboot` and `dish_stow` requests in the Device protocol, too, so it s
Proper Python packaging, since the dependency list keeps growing.... Proper Python packaging, since the dependency list keeps growing....
Some of the functionality implemented in the `starlink-grpc` module could be ported into `starlink-json` easily enough, but this won't be a priority unless someone asks for it.
## Other Tidbits ## Other Tidbits
The Starlink Android app actually uses port 9201 instead of 9200. Both appear to expose the same gRPC service, but the one on port 9201 uses [gRPC-Web](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md), which can use HTTP/1.1, whereas the one on port 9200 uses HTTP/2, which is what most gRPC tools expect. The Starlink Android app actually uses port 9201 instead of 9200. Both appear to expose the same gRPC service, but the one on port 9201 uses [gRPC-Web](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md), which can use HTTP/1.1, whereas the one on port 9200 uses HTTP/2, which is what most gRPC tools expect.

285
dish_json_text.py Normal file
View file

@ -0,0 +1,285 @@
#!/usr/bin/python3
r"""Output Starlink user terminal data info in text format.
Expects input as from the following command:
grpcurl -plaintext -d {\"get_history\":{}} 192.168.100.1:9200 SpaceX.API.Device.Device/Handle
This script examines the most recent samples from the history data and
prints several different metrics computed from them to stdout. By default,
it will print the results in CSV format.
"""
import argparse
from datetime import datetime
from datetime import timezone
import logging
import re
import sys
import time
import starlink_json
BRACKETS_RE = re.compile(r"([^[]*)(\[((\d+),|)(\d*)\]|)$")
SAMPLES_DEFAULT = 3600
HISTORY_STATS_MODES = [
"ping_drop", "ping_run_length", "ping_latency", "ping_loaded_latency", "usage"
]
VERBOSE_FIELD_MAP = {
# ping_drop fields
"samples": "Parsed samples",
"end_counter": "Sample counter",
"total_ping_drop": "Total ping drop",
"count_full_ping_drop": "Count of drop == 1",
"count_obstructed": "Obstructed",
"total_obstructed_ping_drop": "Obstructed ping drop",
"count_full_obstructed_ping_drop": "Obstructed drop == 1",
"count_unscheduled": "Unscheduled",
"total_unscheduled_ping_drop": "Unscheduled ping drop",
"count_full_unscheduled_ping_drop": "Unscheduled drop == 1",
# ping_run_length fields
"init_run_fragment": "Initial drop run fragment",
"final_run_fragment": "Final drop run fragment",
"run_seconds": "Per-second drop runs",
"run_minutes": "Per-minute drop runs",
# ping_latency fields
"mean_all_ping_latency": "Mean RTT, drop < 1",
"deciles_all_ping_latency": "RTT deciles, drop < 1",
"mean_full_ping_latency": "Mean RTT, drop == 0",
"deciles_full_ping_latency": "RTT deciles, drop == 0",
"stdev_full_ping_latency": "RTT standard deviation, drop == 0",
# ping_loaded_latency is still experimental, so leave those unexplained
# usage fields
"download_usage": "Bytes downloaded",
"upload_usage": "Bytes uploaded",
}
def parse_args():
parser = argparse.ArgumentParser(
description="Collect status and/or history data from a Starlink user terminal and "
"print it to standard output in text format; by default, will print in CSV format",
add_help=False)
group = parser.add_argument_group(title="General options")
group.add_argument("-f", "--filename", default="-", help="The file to parse, default: stdin")
group.add_argument("-h", "--help", action="help", help="Be helpful")
group.add_argument("-t",
"--timestamp",
help="UTC time history data was pulled, as YYYY-MM-DD_HH:MM:SS or as "
"seconds since Unix epoch, default: current time")
group.add_argument("-v", "--verbose", action="store_true", help="Be verbose")
group = parser.add_argument_group(title="History mode options")
group.add_argument("-a",
"--all-samples",
action="store_const",
const=-1,
dest="samples",
help="Parse all valid samples")
group.add_argument("-s",
"--samples",
type=int,
help="Number of data samples to parse, default: all in bulk mode, "
"else " + str(SAMPLES_DEFAULT))
group = parser.add_argument_group(title="CSV output options")
group.add_argument("-H",
"--print-header",
action="store_true",
help="Print CSV header instead of parsing data")
all_modes = HISTORY_STATS_MODES + ["bulk_history"]
parser.add_argument("mode",
nargs="+",
choices=all_modes,
help="The data group to record, one or more of: " + ", ".join(all_modes),
metavar="mode")
opts = parser.parse_args()
# for convenience, set flags for whether any mode in a group is selected
opts.history_stats_mode = bool(set(HISTORY_STATS_MODES).intersection(opts.mode))
opts.bulk_mode = "bulk_history" in opts.mode
if opts.history_stats_mode and opts.bulk_mode:
parser.error("bulk_history cannot be combined with other modes for CSV output")
if opts.samples is None:
opts.samples = -1 if opts.bulk_mode else SAMPLES_DEFAULT
if opts.timestamp is None:
opts.history_time = None
else:
try:
opts.history_time = int(opts.timestamp)
except ValueError:
try:
opts.history_time = int(
datetime.strptime(opts.timestamp, "%Y-%m-%d_%H:%M:%S").timestamp())
except ValueError:
parser.error("Could not parse timestamp")
if opts.verbose:
print("Using timestamp", datetime.fromtimestamp(opts.history_time, tz=timezone.utc))
return opts
def print_header(opts):
header = ["datetimestamp_utc"]
def header_add(names):
for name in names:
name, start, end = BRACKETS_RE.match(name).group(1, 4, 5)
if start:
header.extend(name + "_" + str(x) for x in range(int(start), int(end)))
elif end:
header.extend(name + "_" + str(x) for x in range(int(end)))
else:
header.append(name)
if opts.bulk_mode:
general, bulk = starlink_json.history_bulk_field_names()
header_add(general)
header_add(bulk)
if opts.history_stats_mode:
groups = starlink_json.history_stats_field_names()
general, ping, runlen, latency, loaded, usage = groups[0:6]
header_add(general)
if "ping_drop" in opts.mode:
header_add(ping)
if "ping_run_length" in opts.mode:
header_add(runlen)
if "ping_loaded_latency" in opts.mode:
header_add(loaded)
if "ping_latency" in opts.mode:
header_add(latency)
if "usage" in opts.mode:
header_add(usage)
print(",".join(header))
return 0
def get_data(opts, add_item, add_sequence, add_bulk):
def add_data(data):
for key, val in data.items():
name, seq = BRACKETS_RE.match(key).group(1, 5)
if seq is None:
add_item(name, val)
else:
add_sequence(name, val)
if opts.history_stats_mode:
try:
groups = starlink_json.history_stats(opts.filename, opts.samples, verbose=opts.verbose)
except starlink_json.JsonError as e:
logging.error("Failure getting history stats: %s", str(e))
return 1
general, ping, runlen, latency, loaded, usage = groups[0:6]
add_data(general)
if "ping_drop" in opts.mode:
add_data(ping)
if "ping_run_length" in opts.mode:
add_data(runlen)
if "ping_latency" in opts.mode:
add_data(latency)
if "ping_loaded_latency" in opts.mode:
add_data(loaded)
if "usage" in opts.mode:
add_data(usage)
if opts.bulk_mode and add_bulk:
timestamp = int(time.time()) if opts.history_time is None else opts.history_time
try:
general, bulk = starlink_json.history_bulk_data(opts.filename,
opts.samples,
verbose=opts.verbose)
except starlink_json.JsonError as e:
logging.error("Failure getting bulk history: %s", str(e))
return 1
parsed_samples = general["samples"]
new_counter = general["end_counter"]
if opts.verbose:
print("Establishing time base: {0} -> {1}".format(
new_counter, datetime.fromtimestamp(timestamp, tz=timezone.utc)))
timestamp -= parsed_samples
add_bulk(bulk, parsed_samples, timestamp, new_counter - parsed_samples)
return 0
def loop_body(opts):
if opts.verbose:
csv_data = []
else:
history_time = int(time.time()) if opts.history_time is None else opts.history_time
csv_data = [datetime.utcfromtimestamp(history_time).isoformat()]
def cb_data_add_item(name, val):
if opts.verbose:
csv_data.append("{0:22} {1}".format(VERBOSE_FIELD_MAP.get(name, name) + ":", val))
else:
# special case for get_status failure: this will be the lone item added
if name == "state" and val == "DISH_UNREACHABLE":
csv_data.extend(["", "", "", val])
else:
csv_data.append(str(val))
def cb_data_add_sequence(name, val):
if opts.verbose:
csv_data.append("{0:22} {1}".format(
VERBOSE_FIELD_MAP.get(name, name) + ":", ", ".join(str(subval) for subval in val)))
else:
csv_data.extend(str(subval) for subval in val)
def cb_add_bulk(bulk, count, timestamp, counter):
if opts.verbose:
print("Time range (UTC): {0} -> {1}".format(
datetime.utcfromtimestamp(timestamp).isoformat(),
datetime.utcfromtimestamp(timestamp + count).isoformat()))
for key, val in bulk.items():
print("{0:22} {1}".format(key + ":", ", ".join(str(subval) for subval in val)))
else:
for i in range(count):
timestamp += 1
fields = [datetime.utcfromtimestamp(timestamp).isoformat()]
fields.extend(["" if val[i] is None else str(val[i]) for val in bulk.values()])
print(",".join(fields))
rc = get_data(opts, cb_data_add_item, cb_data_add_sequence, cb_add_bulk)
if opts.verbose:
if csv_data:
print("\n".join(csv_data))
else:
# skip if only timestamp
if len(csv_data) > 1:
print(",".join(csv_data))
return rc
def main():
opts = parse_args()
logging.basicConfig(format="%(levelname)s: %(message)s")
if opts.print_header:
rc = print_header(opts)
sys.exit(rc)
# for consistency with dish_grpc_text, pretend there was a loop
rc = loop_body(opts)
sys.exit(rc)
if __name__ == '__main__':
main()

View file

@ -1,120 +0,0 @@
#!/usr/bin/python
######################################################################
#
# Example parser for the JSON format history stats output of grpcurl
# for the gRPC service provided on a Starlink user terminal.
#
# Expects input as from the following command:
# grpcurl -plaintext -d {\"get_history\":{}} 192.168.100.1:9200 SpaceX.API.Device.Device/Handle
#
# This script examines the most recent samples from the history data
# and computes several different metrics related to packet loss. By
# default, it will print the results in CSV format.
#
######################################################################
import datetime
import sys
import getopt
import logging
import starlink_json
arg_error = False
try:
opts, args = getopt.getopt(sys.argv[1:], "ahrs:vH")
except getopt.GetoptError as err:
print(str(err))
arg_error = True
# Default to 1 hour worth of data samples.
samples_default = 3600
samples = samples_default
print_usage = False
verbose = False
print_header = False
run_lengths = False
if not arg_error:
if len(args) > 1:
arg_error = True
else:
for opt, arg in opts:
if opt == "-a":
samples = -1
elif opt == "-h":
print_usage = True
elif opt == "-r":
run_lengths = True
elif opt == "-s":
samples = int(arg)
elif opt == "-v":
verbose = True
elif opt == "-H":
print_header = True
if print_usage or arg_error:
print("Usage: " + sys.argv[0] + " [options...] [<file>]")
print(" where <file> is the file to parse, default: stdin")
print("Options:")
print(" -a: Parse all valid samples")
print(" -h: Be helpful")
print(" -r: Include ping drop run length stats")
print(" -s <num>: Number of data samples to parse, default: " + str(samples_default))
print(" -v: Be verbose")
print(" -H: print CSV header instead of parsing file")
sys.exit(1 if arg_error else 0)
logging.basicConfig(format="%(levelname)s: %(message)s")
g_fields, pd_fields, rl_fields = starlink_json.history_ping_field_names()
if print_header:
header = ["datetimestamp_utc"]
header.extend(g_fields)
header.extend(pd_fields)
if run_lengths:
for field in rl_fields:
if field.startswith("run_"):
header.extend(field + "_" + str(x) for x in range(1, 61))
else:
header.append(field)
print(",".join(header))
sys.exit(0)
timestamp = datetime.datetime.utcnow()
try:
g_stats, pd_stats, rl_stats = starlink_json.history_ping_stats(args[0] if args else "-",
samples, verbose)
except starlink_json.JsonError as e:
logging.error("Failure getting ping stats: %s", str(e))
sys.exit(1)
if verbose:
print("Parsed samples: " + str(g_stats["samples"]))
print("Total ping drop: " + str(pd_stats["total_ping_drop"]))
print("Count of drop == 1: " + str(pd_stats["count_full_ping_drop"]))
print("Obstructed: " + str(pd_stats["count_obstructed"]))
print("Obstructed ping drop: " + str(pd_stats["total_obstructed_ping_drop"]))
print("Obstructed drop == 1: " + str(pd_stats["count_full_obstructed_ping_drop"]))
print("Unscheduled: " + str(pd_stats["count_unscheduled"]))
print("Unscheduled ping drop: " + str(pd_stats["total_unscheduled_ping_drop"]))
print("Unscheduled drop == 1: " + str(pd_stats["count_full_unscheduled_ping_drop"]))
if run_lengths:
print("Initial drop run fragment: " + str(rl_stats["init_run_fragment"]))
print("Final drop run fragment: " + str(rl_stats["final_run_fragment"]))
print("Per-second drop runs: " + ", ".join(str(x) for x in rl_stats["run_seconds"]))
print("Per-minute drop runs: " + ", ".join(str(x) for x in rl_stats["run_minutes"]))
else:
csv_data = [timestamp.replace(microsecond=0).isoformat()]
csv_data.extend(str(g_stats[field]) for field in g_fields)
csv_data.extend(str(pd_stats[field]) for field in pd_fields)
if run_lengths:
for field in rl_fields:
if field.startswith("run_"):
csv_data.extend(str(substat) for substat in rl_stats[field])
else:
csv_data.append(str(rl_stats[field]))
print(",".join(csv_data))

View file

@ -10,6 +10,8 @@ See the starlink_grpc module docstring for descriptions of the stat elements.
""" """
import json import json
import math
import statistics
import sys import sys
from itertools import chain from itertools import chain
@ -19,16 +21,54 @@ class JsonError(Exception):
"""Provides error info when something went wrong with JSON parsing.""" """Provides error info when something went wrong with JSON parsing."""
def history_ping_field_names(): def history_bulk_field_names():
"""Return the field names of the packet loss stats. """Return the field names of the bulk history data.
Note:
See `starlink_grpc` module docs regarding brackets in field names.
Returns: Returns:
A tuple with 3 lists, the first with general stat names, the second A tuple with 2 lists, the first with general data names, the second
with ping drop stat names, and the third with ping drop run length with bulk history data names.
stat names.
""" """
return [ return [
"samples", "samples",
"end_counter",
], [
"pop_ping_drop_rate[]",
"pop_ping_latency_ms[]",
"downlink_throughput_bps[]",
"uplink_throughput_bps[]",
"snr[]",
"scheduled[]",
"obstructed[]",
]
def history_ping_field_names():
"""Deprecated. Use history_stats_field_names instead."""
return history_stats_field_names()[0:3]
def history_stats_field_names():
"""Return the field names of the packet loss stats.
Note:
See `starlink_grpc` module docs regarding brackets in field names.
Returns:
A tuple with 6 lists, with general data names, ping drop stat names,
ping drop run length stat names, ping latency stat names, loaded ping
latency stat names, and bandwidth usage stat names, in that order.
Note:
Additional lists may be added to this tuple in the future with
additional data groups, so it not recommended for the caller to
assume exactly 6 elements.
"""
return [
"samples",
"end_counter",
], [ ], [
"total_ping_drop", "total_ping_drop",
"count_full_ping_drop", "count_full_ping_drop",
@ -41,8 +81,22 @@ def history_ping_field_names():
], [ ], [
"init_run_fragment", "init_run_fragment",
"final_run_fragment", "final_run_fragment",
"run_seconds", "run_seconds[1,61]",
"run_minutes", "run_minutes[1,61]",
], [
"mean_all_ping_latency",
"deciles_all_ping_latency[11]",
"mean_full_ping_latency",
"deciles_full_ping_latency[11]",
"stdev_full_ping_latency",
], [
"load_bucket_samples[15]",
"load_bucket_min_latency[15]",
"load_bucket_median_latency[15]",
"load_bucket_max_latency[15]",
], [
"download_usage",
"upload_usage",
] ]
@ -65,33 +119,7 @@ def get_history(filename):
return json_data["dishGetHistory"] return json_data["dishGetHistory"]
def history_ping_stats(filename, parse_samples, verbose=False): def _compute_sample_range(history, parse_samples, verbose=False):
"""Fetch, parse, and compute the packet loss stats.
Args:
filename (str): Filename from which to read JSON data, or "-" to read
from standard input.
parse_samples (int): Number of samples to process, or -1 to parse all
available samples.
verbose (bool): Optionally produce verbose output.
Returns:
A tuple with 3 dicts, the first mapping general stat names to their
values, the second mapping ping drop stat names to their values and
the third mapping ping drop run length stat names to their values.
Raises:
JsonError: Failure to open, read, or parse JSON on input.
"""
try:
history = get_history(filename)
except ValueError as e:
raise JsonError("Failed to parse JSON: " + str(e))
except Exception as e:
raise JsonError(e)
# "current" is the count of data samples written to the ring buffer,
# irrespective of buffer wrap.
current = int(history["current"]) current = int(history["current"])
samples = len(history["popPingDropRate"]) samples = len(history["popPingDropRate"])
@ -104,9 +132,133 @@ def history_ping_stats(filename, parse_samples, verbose=False):
if verbose: if verbose:
print("Valid samples: " + str(samples)) print("Valid samples: " + str(samples))
if parse_samples < 0 or samples < parse_samples:
parse_samples = samples
start = current - parse_samples
if start == current:
return range(0), 0, current
# This is ring buffer offset, so both index to oldest data sample and # This is ring buffer offset, so both index to oldest data sample and
# index to next data sample after the newest one. # index to next data sample after the newest one.
offset = current % samples end_offset = current % samples
start_offset = start % samples
# Set the range for the requested set of samples. This will iterate
# sample index in order from oldest to newest.
if start_offset < end_offset:
sample_range = range(start_offset, end_offset)
else:
sample_range = chain(range(start_offset, samples), range(0, end_offset))
return sample_range, current - start, current
def history_bulk_data(filename, parse_samples, verbose=False):
"""Fetch history data for a range of samples.
Args:
filename (str): Filename from which to read JSON data, or "-" to read
from standard input.
parse_samples (int): Number of samples to process, or -1 to parse all
available samples.
verbose (bool): Optionally produce verbose output.
Returns:
A tuple with 2 dicts, the first mapping general data names to their
values and the second mapping bulk history data names to their values.
Note: The field names in the returned data do _not_ include brackets
to indicate sequences, since those would just need to be parsed
out. The general data is all single items and the bulk history
data is all sequences.
Raises:
JsonError: Failure to open, read, or parse JSON on input.
"""
try:
history = get_history(filename)
except ValueError as e:
raise JsonError("Failed to parse JSON: " + str(e))
except Exception as e:
raise JsonError(e)
sample_range, parsed_samples, current = _compute_sample_range(history,
parse_samples,
verbose=verbose)
pop_ping_drop_rate = []
pop_ping_latency_ms = []
downlink_throughput_bps = []
uplink_throughput_bps = []
snr = []
scheduled = []
obstructed = []
for i in sample_range:
pop_ping_drop_rate.append(history["popPingDropRate"][i])
pop_ping_latency_ms.append(
history["popPingLatencyMs"][i] if history["popPingDropRate"][i] < 1 else None)
downlink_throughput_bps.append(history["downlinkThroughputBps"][i])
uplink_throughput_bps.append(history["uplinkThroughputBps"][i])
snr.append(history["snr"][i])
scheduled.append(history["scheduled"][i])
obstructed.append(history["obstructed"][i])
return {
"samples": parsed_samples,
"end_counter": current,
}, {
"pop_ping_drop_rate": pop_ping_drop_rate,
"pop_ping_latency_ms": pop_ping_latency_ms,
"downlink_throughput_bps": downlink_throughput_bps,
"uplink_throughput_bps": uplink_throughput_bps,
"snr": snr,
"scheduled": scheduled,
"obstructed": obstructed,
}
def history_ping_stats(filename, parse_samples, verbose=False):
"""Deprecated. Use history_stats instead."""
return history_stats(filename, parse_samples, verbose=verbose)[0:3]
def history_stats(filename, parse_samples, verbose=False):
"""Fetch, parse, and compute ping and usage stats.
Args:
filename (str): Filename from which to read JSON data, or "-" to read
from standard input.
parse_samples (int): Number of samples to process, or -1 to parse all
available samples.
verbose (bool): Optionally produce verbose output.
Returns:
A tuple with 6 dicts, mapping general data names, ping drop stat
names, ping drop run length stat names, ping latency stat names,
loaded ping latency stat names, and bandwidth usage stat names to
their respective values, in that order.
Note:
Additional dicts may be added to this tuple in the future with
additional data groups, so it not recommended for the caller to
assume exactly 6 elements.
Raises:
JsonError: Failure to open, read, or parse JSON on input.
"""
try:
history = get_history(filename)
except ValueError as e:
raise JsonError("Failed to parse JSON: " + str(e))
except Exception as e:
raise JsonError(e)
sample_range, parsed_samples, current = _compute_sample_range(history,
parse_samples,
verbose=verbose)
tot = 0.0 tot = 0.0
count_full_drop = 0 count_full_drop = 0
@ -122,15 +274,12 @@ def history_ping_stats(filename, parse_samples, verbose=False):
run_length = 0 run_length = 0
init_run_length = None init_run_length = None
if parse_samples < 0 or samples < parse_samples: usage_down = 0.0
parse_samples = samples usage_up = 0.0
# Parse the most recent parse_samples-sized set of samples. This will rtt_full = []
# iterate samples in order from oldest to newest. rtt_all = []
if parse_samples <= offset: rtt_buckets = [[] for _ in range(15)]
sample_range = range(offset - parse_samples, offset)
else:
sample_range = chain(range(samples + offset - parse_samples, samples), range(0, offset))
for i in sample_range: for i in sample_range:
d = history["popPingDropRate"][i] d = history["popPingDropRate"][i]
@ -165,6 +314,22 @@ def history_ping_stats(filename, parse_samples, verbose=False):
count_full_obstruct += 1 count_full_obstruct += 1
tot += d tot += d
down = history["downlinkThroughputBps"][i]
usage_down += down
up = history["uplinkThroughputBps"][i]
usage_up += up
rtt = history["popPingLatencyMs"][i]
# note that "full" here means the opposite of ping drop full
if d == 0.0:
rtt_full.append(rtt)
if down + up > 500000:
rtt_buckets[min(14, int(math.log2((down+up) / 500000)))].append(rtt)
else:
rtt_buckets[0].append(rtt)
if d < 1.0:
rtt_all.append((rtt, 1.0 - d))
# If the entire sample set is one big drop run, it will be both initial # If the entire sample set is one big drop run, it will be both initial
# fragment (continued from prior sample range) and final one (continued # fragment (continued from prior sample range) and final one (continued
# to next sample range), but to avoid double-reporting, just call it # to next sample range), but to avoid double-reporting, just call it
@ -173,8 +338,58 @@ def history_ping_stats(filename, parse_samples, verbose=False):
init_run_length = run_length init_run_length = run_length
run_length = 0 run_length = 0
def weighted_mean_and_quantiles(data, n):
if not data:
return None, [None] * (n+1)
total_weight = sum(x[1] for x in data)
result = []
items = iter(data)
value, accum_weight = next(items)
accum_value = value * accum_weight
for boundary in (total_weight * x / n for x in range(n)):
while accum_weight < boundary:
try:
value, weight = next(items)
accum_value += value * weight
accum_weight += weight
except StopIteration:
# shouldn't happen, but in case of float precision weirdness...
break
result.append(value)
result.append(data[-1][0])
accum_value += sum(x[0] for x in items)
return accum_value / total_weight, result
bucket_samples = []
bucket_min = []
bucket_median = []
bucket_max = []
for bucket in rtt_buckets:
if bucket:
bucket_samples.append(len(bucket))
bucket_min.append(min(bucket))
bucket_median.append(statistics.median(bucket))
bucket_max.append(max(bucket))
else:
bucket_samples.append(0)
bucket_min.append(None)
bucket_median.append(None)
bucket_max.append(None)
rtt_all.sort(key=lambda x: x[0])
wmean_all, wdeciles_all = weighted_mean_and_quantiles(rtt_all, 10)
if len(rtt_full) > 1:
deciles_full = [min(rtt_full)]
deciles_full.extend(statistics.quantiles(rtt_full, n=10, method="inclusive"))
deciles_full.append(max(rtt_full))
elif rtt_full:
deciles_full = [rtt_full[0]] * 11
else:
deciles_full = [None] * 11
return { return {
"samples": parse_samples, "samples": parsed_samples,
"end_counter": current,
}, { }, {
"total_ping_drop": tot, "total_ping_drop": tot,
"count_full_ping_drop": count_full_drop, "count_full_ping_drop": count_full_drop,
@ -187,6 +402,20 @@ def history_ping_stats(filename, parse_samples, verbose=False):
}, { }, {
"init_run_fragment": init_run_length, "init_run_fragment": init_run_length,
"final_run_fragment": run_length, "final_run_fragment": run_length,
"run_seconds": second_runs, "run_seconds[1,]": second_runs,
"run_minutes": minute_runs, "run_minutes[1,]": minute_runs,
}, {
"mean_all_ping_latency": wmean_all,
"deciles_all_ping_latency[]": wdeciles_all,
"mean_full_ping_latency": statistics.fmean(rtt_full) if rtt_full else None,
"deciles_full_ping_latency[]": deciles_full,
"stdev_full_ping_latency": statistics.pstdev(rtt_full) if rtt_full else None,
}, {
"load_bucket_samples[]": bucket_samples,
"load_bucket_min_latency[]": bucket_min,
"load_bucket_median_latency[]": bucket_median,
"load_bucket_max_latency[]": bucket_max,
}, {
"download_usage": int(round(usage_down / 8)),
"upload_usage": int(round(usage_up / 8)),
} }