From 4ff6cfb5fae7198be8b6e09ab3a8aea02ba91e04 Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Wed, 30 Dec 2020 13:01:41 -0800 Subject: [PATCH 1/4] Fix line endings to be consistent with the other scripts --- parseJsonHistory.py | 380 ++++++++++++++++++++++---------------------- 1 file changed, 190 insertions(+), 190 deletions(-) diff --git a/parseJsonHistory.py b/parseJsonHistory.py index 8c80386..b7a993d 100644 --- a/parseJsonHistory.py +++ b/parseJsonHistory.py @@ -1,190 +1,190 @@ -#!/usr/bin/python -###################################################################### -# -# Example parser for the JSON format history stats output of grpcurl -# for the gRPC service provided on a Starlink user terminal. -# -# Expects input as from the following command: -# grpcurl -plaintext -d {\"get_history\":{}} 192.168.100.1:9200 SpaceX.API.Device.Device/Handle -# -# This script examines the most recent samples from the history data -# and computes several different metrics related to packet loss. By -# default, it will print the results in CSV format. -# -###################################################################### - -import json -import datetime -import sys -import getopt - -from itertools import chain - -fArgError = False - -try: - opts, args = getopt.getopt(sys.argv[1:], "ahrs:vH") -except getopt.GetoptError as err: - print(str(err)) - fArgError = True - -# Default to 1 hour worth of data samples. -parseSamples = 3600 -fUsage = False -fVerbose = False -fParseAll = False -fHeader = False -fRunLengths = False - -if not fArgError: - if len(args) > 1: - fArgError = True - else: - for opt, arg in opts: - if opt == "-a": - fParseAll = True - elif opt == "-h": - fUsage = True - elif opt == "-r": - fRunLengths = True - elif opt == "-s": - parseSamples = int(arg) - elif opt == "-v": - fVerbose = True - elif opt == "-H": - fHeader = True - -if fUsage or fArgError: - print("Usage: "+sys.argv[0]+" [options...] []") - print(" where is the file to parse, default: stdin") - print("Options:") - print(" -a: Parse all valid samples") - print(" -h: Be helpful") - print(" -r: Include ping drop run length stats") - print(" -s : Parse data samples, default: "+str(parseSamples)) - print(" -v: Be verbose") - print(" -H: print CSV header instead of parsing file") - sys.exit(1 if fArgError else 0) - -if fHeader: - header = "datetimestamp_utc,samples,total_ping_drop,count_full_ping_drop,count_obstructed,total_obstructed_ping_drop,count_full_obstructed_ping_drop,count_unscheduled,total_unscheduled_ping_drop,count_full_unscheduled_ping_drop" - if fRunLengths: - header+= ",init_run_fragment,final_run_fragment," - header += ",".join("run_seconds_" + str(x) for x in range(1, 61)) + "," - header += ",".join("run_minutes_" + str(x) for x in range(1, 60)) - header += ",run_minutes_60_or_greater" - print(header) - sys.exit(0) - -# Allow "-" to be specified as file for stdin. -if len(args) == 0 or args[0] == "-": - jsonData = json.load(sys.stdin) -else: - jsonFile = open(args[0]) - jsonData = json.load(jsonFile) - jsonFile.close() - -timestamp = datetime.datetime.utcnow() - -historyData = jsonData['dishGetHistory'] - -# 'current' is the count of data samples written to the ring buffer, -# irrespective of buffer wrap. -current = int(historyData['current']) -nSamples = len(historyData['popPingDropRate']) - -if fVerbose: - print("current: " + str(current)) - print("All samples: " + str(nSamples)) - -nSamples = min(nSamples,current) - -if fVerbose: - print("Valid samples: " + str(nSamples)) - -# This is ring buffer offset, so both index to oldest data sample and -# index to next data sample after the newest one. -offset = current % nSamples - -tot = 0 -totOne = 0 -totUnsched = 0 -totUnschedD = 0 -totUnschedOne = 0 -totObstruct = 0 -totObstructD = 0 -totObstructOne = 0 - -secondRuns = [0] * 60 -minuteRuns = [0] * 60 -runLength = 0 -initRun = None - -if fParseAll or nSamples < parseSamples: - parseSamples = nSamples - -# Parse the most recent parseSamples-sized set of samples. This will -# iterate samples in order from oldest to newest, although that's not -# actually required for the current set of stats being computed below. -if parseSamples <= offset: - sampleRange = range(offset - parseSamples, offset) -else: - sampleRange = chain(range(nSamples + offset - parseSamples, nSamples), range(0, offset)) - -for i in sampleRange: - d = historyData["popPingDropRate"][i] - tot += d - if d >= 1: - totOne += d - runLength += 1 - elif runLength > 0: - if initRun is None: - initRun = runLength - else: - if runLength <= 60: - secondRuns[runLength-1] += runLength - else: - minuteRuns[min((runLength-1)//60-1, 59)] += runLength - runLength = 0 - elif initRun is None: - initRun = 0 - if not historyData["scheduled"][i]: - totUnsched += 1 - totUnschedD += d - if d >= 1: - totUnschedOne += d - if historyData["obstructed"][i]: - totObstruct += 1 - totObstructD += d - if d >= 1: - totObstructOne += d - -# If the entire sample set is one big drop run, it will be both initial -# fragment (continued from prior sample range) and final one (continued -# to next sample range), but to avoid double-reporting, just call it -# the initial run. -if initRun is None: - initRun = runLength - runLength = 0 - -if fVerbose: - print("Parsed samples: " + str(parseSamples)) - print("Total ping drop: " + str(tot)) - print("Count of drop == 1: " + str(totOne)) - print("Obstructed: " + str(totObstruct)) - print("Obstructed ping drop: " + str(totObstructD)) - print("Obstructed drop == 1: " + str(totObstructOne)) - print("Unscheduled: " + str(totUnsched)) - print("Unscheduled ping drop: " + str(totUnschedD)) - print("Unscheduled drop == 1: " + str(totUnschedOne)) - if fRunLengths: - print("Initial drop run fragment: " + str(initRun)) - print("Final drop run fragment: " + str(runLength)) - print("Per-second drop runs: " + ", ".join(str(x) for x in secondRuns)) - print("Per-minute drop runs: " + ", ".join(str(x) for x in minuteRuns)) -else: - # NOTE: When changing data output format, also change the -H header printing above. - csvData = timestamp.replace(microsecond=0).isoformat() + "," + ",".join(str(x) for x in [parseSamples, tot, totOne, totObstruct, totObstructD, totObstructOne, totUnsched, totUnschedD, totUnschedOne]) - if fRunLengths: - csvData += "," + ",".join(str(x) for x in chain([initRun, runLength], secondRuns, minuteRuns)) - print(csvData) +#!/usr/bin/python +###################################################################### +# +# Example parser for the JSON format history stats output of grpcurl +# for the gRPC service provided on a Starlink user terminal. +# +# Expects input as from the following command: +# grpcurl -plaintext -d {\"get_history\":{}} 192.168.100.1:9200 SpaceX.API.Device.Device/Handle +# +# This script examines the most recent samples from the history data +# and computes several different metrics related to packet loss. By +# default, it will print the results in CSV format. +# +###################################################################### + +import json +import datetime +import sys +import getopt + +from itertools import chain + +fArgError = False + +try: + opts, args = getopt.getopt(sys.argv[1:], "ahrs:vH") +except getopt.GetoptError as err: + print(str(err)) + fArgError = True + +# Default to 1 hour worth of data samples. +parseSamples = 3600 +fUsage = False +fVerbose = False +fParseAll = False +fHeader = False +fRunLengths = False + +if not fArgError: + if len(args) > 1: + fArgError = True + else: + for opt, arg in opts: + if opt == "-a": + fParseAll = True + elif opt == "-h": + fUsage = True + elif opt == "-r": + fRunLengths = True + elif opt == "-s": + parseSamples = int(arg) + elif opt == "-v": + fVerbose = True + elif opt == "-H": + fHeader = True + +if fUsage or fArgError: + print("Usage: "+sys.argv[0]+" [options...] []") + print(" where is the file to parse, default: stdin") + print("Options:") + print(" -a: Parse all valid samples") + print(" -h: Be helpful") + print(" -r: Include ping drop run length stats") + print(" -s : Parse data samples, default: "+str(parseSamples)) + print(" -v: Be verbose") + print(" -H: print CSV header instead of parsing file") + sys.exit(1 if fArgError else 0) + +if fHeader: + header = "datetimestamp_utc,samples,total_ping_drop,count_full_ping_drop,count_obstructed,total_obstructed_ping_drop,count_full_obstructed_ping_drop,count_unscheduled,total_unscheduled_ping_drop,count_full_unscheduled_ping_drop" + if fRunLengths: + header+= ",init_run_fragment,final_run_fragment," + header += ",".join("run_seconds_" + str(x) for x in range(1, 61)) + "," + header += ",".join("run_minutes_" + str(x) for x in range(1, 60)) + header += ",run_minutes_60_or_greater" + print(header) + sys.exit(0) + +# Allow "-" to be specified as file for stdin. +if len(args) == 0 or args[0] == "-": + jsonData = json.load(sys.stdin) +else: + jsonFile = open(args[0]) + jsonData = json.load(jsonFile) + jsonFile.close() + +timestamp = datetime.datetime.utcnow() + +historyData = jsonData['dishGetHistory'] + +# 'current' is the count of data samples written to the ring buffer, +# irrespective of buffer wrap. +current = int(historyData['current']) +nSamples = len(historyData['popPingDropRate']) + +if fVerbose: + print("current: " + str(current)) + print("All samples: " + str(nSamples)) + +nSamples = min(nSamples,current) + +if fVerbose: + print("Valid samples: " + str(nSamples)) + +# This is ring buffer offset, so both index to oldest data sample and +# index to next data sample after the newest one. +offset = current % nSamples + +tot = 0 +totOne = 0 +totUnsched = 0 +totUnschedD = 0 +totUnschedOne = 0 +totObstruct = 0 +totObstructD = 0 +totObstructOne = 0 + +secondRuns = [0] * 60 +minuteRuns = [0] * 60 +runLength = 0 +initRun = None + +if fParseAll or nSamples < parseSamples: + parseSamples = nSamples + +# Parse the most recent parseSamples-sized set of samples. This will +# iterate samples in order from oldest to newest, although that's not +# actually required for the current set of stats being computed below. +if parseSamples <= offset: + sampleRange = range(offset - parseSamples, offset) +else: + sampleRange = chain(range(nSamples + offset - parseSamples, nSamples), range(0, offset)) + +for i in sampleRange: + d = historyData["popPingDropRate"][i] + tot += d + if d >= 1: + totOne += d + runLength += 1 + elif runLength > 0: + if initRun is None: + initRun = runLength + else: + if runLength <= 60: + secondRuns[runLength-1] += runLength + else: + minuteRuns[min((runLength-1)//60-1, 59)] += runLength + runLength = 0 + elif initRun is None: + initRun = 0 + if not historyData["scheduled"][i]: + totUnsched += 1 + totUnschedD += d + if d >= 1: + totUnschedOne += d + if historyData["obstructed"][i]: + totObstruct += 1 + totObstructD += d + if d >= 1: + totObstructOne += d + +# If the entire sample set is one big drop run, it will be both initial +# fragment (continued from prior sample range) and final one (continued +# to next sample range), but to avoid double-reporting, just call it +# the initial run. +if initRun is None: + initRun = runLength + runLength = 0 + +if fVerbose: + print("Parsed samples: " + str(parseSamples)) + print("Total ping drop: " + str(tot)) + print("Count of drop == 1: " + str(totOne)) + print("Obstructed: " + str(totObstruct)) + print("Obstructed ping drop: " + str(totObstructD)) + print("Obstructed drop == 1: " + str(totObstructOne)) + print("Unscheduled: " + str(totUnsched)) + print("Unscheduled ping drop: " + str(totUnschedD)) + print("Unscheduled drop == 1: " + str(totUnschedOne)) + if fRunLengths: + print("Initial drop run fragment: " + str(initRun)) + print("Final drop run fragment: " + str(runLength)) + print("Per-second drop runs: " + ", ".join(str(x) for x in secondRuns)) + print("Per-minute drop runs: " + ", ".join(str(x) for x in minuteRuns)) +else: + # NOTE: When changing data output format, also change the -H header printing above. + csvData = timestamp.replace(microsecond=0).isoformat() + "," + ",".join(str(x) for x in [parseSamples, tot, totOne, totObstruct, totObstructD, totObstructOne, totUnsched, totUnschedD, totUnschedOne]) + if fRunLengths: + csvData += "," + ",".join(str(x) for x in chain([initRun, runLength], secondRuns, minuteRuns)) + print(csvData) From a5036db9e09aff3bb2616915b76f01cd2e2c7566 Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Wed, 30 Dec 2020 13:09:24 -0800 Subject: [PATCH 2/4] Don't allow a sample to be both unscheduled and obstructed Doesn't ever seem to happen, but in case it does in the future, treat that case as just unscheduled. This way, the unclassified ping loss (AKA "Beta downtime") can be computed from the totals. --- dishHistoryStats.py | 5 ++++- parseJsonHistory.py | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/dishHistoryStats.py b/dishHistoryStats.py index 5dd1a51..1a11d78 100644 --- a/dishHistoryStats.py +++ b/dishHistoryStats.py @@ -150,7 +150,10 @@ for i in sampleRange: totUnschedD += d if d >= 1: totUnschedOne += d - if historyData.obstructed[i]: + # scheduled=false and obstructed=true do not ever appear to overlap, + # but in case they do in the future, treat that as just unscheduled + # in order to avoid double-counting it. + elif historyData.obstructed[i]: totObstruct += 1 totObstructD += d if d >= 1: diff --git a/parseJsonHistory.py b/parseJsonHistory.py index b7a993d..0817914 100644 --- a/parseJsonHistory.py +++ b/parseJsonHistory.py @@ -153,7 +153,10 @@ for i in sampleRange: totUnschedD += d if d >= 1: totUnschedOne += d - if historyData["obstructed"][i]: + # scheduled=false and obstructed=true do not ever appear to overlap, + # but in case they do in the future, treat that as just unscheduled + # in order to avoid double-counting it. + elif historyData["obstructed"][i]: totObstruct += 1 totObstructD += d if d >= 1: From d16579155995091e8926299b0104a0a99ed991d0 Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Wed, 6 Jan 2021 10:12:56 -0800 Subject: [PATCH 3/4] Readability improvements (or so PEP 8 style guide claims...) --- dishDumpStatus.py | 2 +- dishStatusCsv.py | 36 ++++++++++++++++++-------- dishStatusInflux.py | 62 ++++++++++++++++++++++----------------------- dishStatusMqtt.py | 34 ++++++++++++------------- 4 files changed, 74 insertions(+), 60 deletions(-) diff --git a/dishDumpStatus.py b/dishDumpStatus.py index 118d978..1bebb41 100644 --- a/dishDumpStatus.py +++ b/dishDumpStatus.py @@ -12,7 +12,7 @@ import spacex.api.device.device_pb2_grpc # Note that if you remove the 'with' clause here, you need to separately # call channel.close() when you're done with the gRPC connection. -with grpc.insecure_channel('192.168.100.1:9200') as channel: +with grpc.insecure_channel("192.168.100.1:9200") as channel: stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) diff --git a/dishStatusCsv.py b/dishStatusCsv.py index 3b8e31b..98269e8 100644 --- a/dishStatusCsv.py +++ b/dishStatusCsv.py @@ -6,14 +6,14 @@ # This script pulls the current status once and prints to stdout. # ###################################################################### +import datetime + import grpc import spacex.api.device.device_pb2 import spacex.api.device.device_pb2_grpc -import datetime - -with grpc.insecure_channel('192.168.100.1:9200') as channel: +with grpc.insecure_channel("192.168.100.1:9200") as channel: stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) @@ -27,11 +27,25 @@ alert_bits = 0 for alert in status.alerts.ListFields(): alert_bits |= (1 if alert[1] else 0) << (alert[0].number - 1) -print(",".join([timestamp.replace(microsecond=0).isoformat(), status.device_info.id, - status.device_info.hardware_version, status.device_info.software_version, - spacex.api.device.dish_pb2.DishState.Name(status.state)]) + "," + - ",".join(str(x) for x in [status.device_state.uptime_s, status.snr, status.seconds_to_first_nonempty_slot, - status.pop_ping_drop_rate, status.downlink_throughput_bps, status.uplink_throughput_bps, - status.pop_ping_latency_ms, alert_bits, status.obstruction_stats.fraction_obstructed, - status.obstruction_stats.currently_obstructed, status.obstruction_stats.last_24h_obstructed_s]) + "," + - ",".join(str(x) for x in status.obstruction_stats.wedge_abs_fraction_obstructed)) +csv_data = [ + timestamp.replace(microsecond=0).isoformat(), + status.device_info.id, + status.device_info.hardware_version, + status.device_info.software_version, + spacex.api.device.dish_pb2.DishState.Name(status.state) +] +csv_data.extend(str(x) for x in [ + status.device_state.uptime_s, + status.snr, + status.seconds_to_first_nonempty_slot, + status.pop_ping_drop_rate, + status.downlink_throughput_bps, + status.uplink_throughput_bps, + status.pop_ping_latency_ms, + alert_bits, + status.obstruction_stats.fraction_obstructed, + status.obstruction_stats.currently_obstructed, + status.obstruction_stats.last_24h_obstructed_s +]) +csv_data.extend(str(x) for x in status.obstruction_stats.wedge_abs_fraction_obstructed) +print(",".join(csv_data)) diff --git a/dishStatusInflux.py b/dishStatusInflux.py index e8658bb..b6ab371 100644 --- a/dishStatusInflux.py +++ b/dishStatusInflux.py @@ -7,6 +7,8 @@ # the specified InfluxDB database in a loop. # ###################################################################### +import time + from influxdb import InfluxDBClient from influxdb import SeriesHelper @@ -15,10 +17,8 @@ import grpc import spacex.api.device.device_pb2 import spacex.api.device.device_pb2_grpc -import time - -fVerbose = True -sleepTime = 30 +verbose = True +sleep_time = 30 class DeviceStatusSeries(SeriesHelper): class Meta: @@ -41,20 +41,20 @@ class DeviceStatusSeries(SeriesHelper): "fraction_obstructed"] tags = ["id"] -influxClient = InfluxDBClient(host="localhost", port=8086, username="script-user", password="password", database="dishstats", ssl=False, retries=1, timeout=15) +influx_client = InfluxDBClient(host="localhost", port=8086, username="script-user", password="password", database="dishstats", ssl=False, retries=1, timeout=15) try: - dishChannel = None - lastId = None - fLastFailed = False + dish_channel = None + last_id = None + last_failed = False pending = 0 count = 0 while True: try: - if dishChannel is None: - dishChannel = grpc.insecure_channel("192.168.100.1:9200") - stub = spacex.api.device.device_pb2_grpc.DeviceStub(dishChannel) + if dish_channel is None: + dish_channel = grpc.insecure_channel("192.168.100.1:9200") + stub = spacex.api.device.device_pb2_grpc.DeviceStub(dish_channel) response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) status = response.dish_get_status DeviceStatusSeries( @@ -74,45 +74,45 @@ try: pop_ping_latency_ms=status.pop_ping_latency_ms, currently_obstructed=status.obstruction_stats.currently_obstructed, fraction_obstructed=status.obstruction_stats.fraction_obstructed) - lastId = status.device_info.id - fLastFailed = False - except Exception as e: - if not dishChannel is None: - dishChannel.close() - dishChannel = None - if fLastFailed: - if not lastId is None: - DeviceStatusSeries(id=lastId, state="DISH_UNREACHABLE") + last_id = status.device_info.id + last_failed = False + except grpc.RpcError: + if dish_channel is not None: + dish_channel.close() + dish_channel = None + if last_failed: + if last_id is not None: + DeviceStatusSeries(id=last_id, state="DISH_UNREACHABLE") else: # Retry once, because the connection may have been lost while # we were sleeping - fLastFailed = True + last_failed = True continue pending = pending + 1 - if fVerbose: + if verbose: print("Samples: " + str(pending)) count = count + 1 if count > 5: try: - DeviceStatusSeries.commit(influxClient) - if fVerbose: + DeviceStatusSeries.commit(influx_client) + if verbose: print("Wrote " + str(pending)) pending = 0 except Exception as e: print("Failed to write: " + str(e)) count = 0 - if sleepTime > 0: - time.sleep(sleepTime) + if sleep_time > 0: + time.sleep(sleep_time) else: break finally: # Flush on error/exit try: - DeviceStatusSeries.commit(influxClient) - if fVerbose: + DeviceStatusSeries.commit(influx_client) + if verbose: print("Wrote " + str(pending)) except Exception as e: print("Failed to write: " + str(e)) - influxClient.close() - if not dishChannel is None: - dishChannel.close() + influx_client.close() + if dish_channel is not None: + dish_channel.close() diff --git a/dishStatusMqtt.py b/dishStatusMqtt.py index a0993cc..9baaddd 100644 --- a/dishStatusMqtt.py +++ b/dishStatusMqtt.py @@ -14,7 +14,7 @@ import grpc import spacex.api.device.device_pb2 import spacex.api.device.device_pb2_grpc -with grpc.insecure_channel('192.168.100.1:9200') as channel: +with grpc.insecure_channel("192.168.100.1:9200") as channel: stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) @@ -26,25 +26,25 @@ alert_bits = 0 for alert in status.alerts.ListFields(): alert_bits |= (1 if alert[1] else 0) << (alert[0].number - 1) -topicPrefix = "starlink/dish_status/" + status.device_info.id + "/" -msgs = [(topicPrefix + "hardware_version", status.device_info.hardware_version, 0, False), - (topicPrefix + "software_version", status.device_info.software_version, 0, False), - (topicPrefix + "state", spacex.api.device.dish_pb2.DishState.Name(status.state), 0, False), - (topicPrefix + "uptime", status.device_state.uptime_s, 0, False), - (topicPrefix + "snr", status.snr, 0, False), - (topicPrefix + "seconds_to_first_nonempty_slot", status.seconds_to_first_nonempty_slot, 0, False), - (topicPrefix + "pop_ping_drop_rate", status.pop_ping_drop_rate, 0, False), - (topicPrefix + "downlink_throughput_bps", status.downlink_throughput_bps, 0, False), - (topicPrefix + "uplink_throughput_bps", status.uplink_throughput_bps, 0, False), - (topicPrefix + "pop_ping_latency_ms", status.pop_ping_latency_ms, 0, False), - (topicPrefix + "alerts", alert_bits, 0, False), - (topicPrefix + "fraction_obstructed", status.obstruction_stats.fraction_obstructed, 0, False), - (topicPrefix + "currently_obstructed", status.obstruction_stats.currently_obstructed, 0, False), +topic_prefix = "starlink/dish_status/" + status.device_info.id + "/" +msgs = [(topic_prefix + "hardware_version", status.device_info.hardware_version, 0, False), + (topic_prefix + "software_version", status.device_info.software_version, 0, False), + (topic_prefix + "state", spacex.api.device.dish_pb2.DishState.Name(status.state), 0, False), + (topic_prefix + "uptime", status.device_state.uptime_s, 0, False), + (topic_prefix + "snr", status.snr, 0, False), + (topic_prefix + "seconds_to_first_nonempty_slot", status.seconds_to_first_nonempty_slot, 0, False), + (topic_prefix + "pop_ping_drop_rate", status.pop_ping_drop_rate, 0, False), + (topic_prefix + "downlink_throughput_bps", status.downlink_throughput_bps, 0, False), + (topic_prefix + "uplink_throughput_bps", status.uplink_throughput_bps, 0, False), + (topic_prefix + "pop_ping_latency_ms", status.pop_ping_latency_ms, 0, False), + (topic_prefix + "alerts", alert_bits, 0, False), + (topic_prefix + "fraction_obstructed", status.obstruction_stats.fraction_obstructed, 0, False), + (topic_prefix + "currently_obstructed", status.obstruction_stats.currently_obstructed, 0, False), # While the field name for this one implies it covers 24 hours, the # empirical evidence suggests it only covers 12 hours. It also resets # on dish reboot, so may not cover that whole period. Rather than try # to convey that complexity in the topic label, just be a bit vague: - (topicPrefix + "seconds_obstructed", status.obstruction_stats.last_24h_obstructed_s, 0, False), - (topicPrefix + "wedges_fraction_obstructed", ",".join(str(x) for x in status.obstruction_stats.wedge_abs_fraction_obstructed), 0, False)] + (topic_prefix + "seconds_obstructed", status.obstruction_stats.last_24h_obstructed_s, 0, False), + (topic_prefix + "wedges_fraction_obstructed", ",".join(str(x) for x in status.obstruction_stats.wedge_abs_fraction_obstructed), 0, False)] paho.mqtt.publish.multiple(msgs, hostname="localhost", client_id=status.device_info.id) From 170dd2daae812cda036408926f9094525da4bbbe Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Wed, 6 Jan 2021 11:46:50 -0800 Subject: [PATCH 4/4] Reorganize history parsing logic into a separate module Moves the parsing logic that will be shared by some upcoming scripts out into a separate module. There's still just as much duplication between the JSON parser and the grpc parser as there was before, but this should at least prevent further duplication of this logic. This also adds some proper documentation for what each of the stats means. --- dishHistoryStats.py | 204 ++++++++++++-------------------------- parseJsonHistory.py | 202 ++++++++++++------------------------- starlink_grpc.py | 235 ++++++++++++++++++++++++++++++++++++++++++++ starlink_json.py | 178 +++++++++++++++++++++++++++++++++ 4 files changed, 539 insertions(+), 280 deletions(-) create mode 100644 starlink_grpc.py create mode 100644 starlink_json.py diff --git a/dishHistoryStats.py b/dishHistoryStats.py index 1a11d78..b9aeb58 100644 --- a/dishHistoryStats.py +++ b/dishHistoryStats.py @@ -2,7 +2,7 @@ ###################################################################### # # Equivalent script to parseJsonHistory.py, except integrating the -# gRPC calls, instead of relying on separatate invocation of grpcurl. +# gRPC calls, instead of relying on separate invocation of grpcurl. # # This script examines the most recent samples from the history data # and computes several different metrics related to packet loss. By @@ -10,181 +10,103 @@ # ###################################################################### -import grpc - -import spacex.api.device.device_pb2 -import spacex.api.device.device_pb2_grpc - import datetime import sys import getopt -from itertools import chain +import starlink_grpc - -fArgError = False +arg_error = False try: opts, args = getopt.getopt(sys.argv[1:], "ahrs:vH") except getopt.GetoptError as err: print(str(err)) - fArgError = True + arg_error = True # Default to 1 hour worth of data samples. -parseSamples = 3600 -fUsage = False -fVerbose = False -fParseAll = False -fHeader = False -fRunLengths = False +samples_default = 3600 +samples = samples_default +print_usage = False +verbose = False +parse_all = False +print_header = False +run_lengths = False -if not fArgError: +if not arg_error: if len(args) > 0: - fArgError = True + arg_error = True else: for opt, arg in opts: if opt == "-a": - fParseAll = True + parse_all = True elif opt == "-h": - fUsage = True + print_usage = True elif opt == "-r": - fRunLengths = True + run_lengths = True elif opt == "-s": - parseSamples = int(arg) + samples = int(arg) elif opt == "-v": - fVerbose = True + verbose = True elif opt == "-H": - fHeader = True + print_header = True -if fUsage or fArgError: - print("Usage: "+sys.argv[0]+" [options...]") +if print_usage or arg_error: + print("Usage: " + sys.argv[0] + " [options...]") print("Options:") print(" -a: Parse all valid samples") print(" -h: Be helpful") print(" -r: Include ping drop run length stats") - print(" -s : Parse data samples, default: "+str(parseSamples)) + print(" -s : Parse data samples, default: " + str(samples_default)) print(" -v: Be verbose") print(" -H: print CSV header instead of parsing file") - sys.exit(1 if fArgError else 0) + sys.exit(1 if arg_error else 0) -if fHeader: - header = "datetimestamp_utc,samples,total_ping_drop,count_full_ping_drop,count_obstructed,total_obstructed_ping_drop,count_full_obstructed_ping_drop,count_unscheduled,total_unscheduled_ping_drop,count_full_unscheduled_ping_drop" - if fRunLengths: - header+= ",init_run_fragment,final_run_fragment," - header += ",".join("run_seconds_" + str(x) for x in range(1, 61)) + "," - header += ",".join("run_minutes_" + str(x) for x in range(1, 60)) - header += ",run_minutes_60_or_greater" - print(header) +fields, rl_fields = starlink_grpc.history_ping_field_names() + +if print_header: + header = ["datetimestamp_utc"] + header.extend(fields) + if run_lengths: + for field in rl_fields: + if field.startswith("run_"): + header.extend(field + "_" + str(x) for x in range(1, 61)) + else: + header.append(field) + print(",".join(header)) sys.exit(0) -with grpc.insecure_channel('192.168.100.1:9200') as channel: - stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) - response = stub.Handle(spacex.api.device.device_pb2.Request(get_history={})) - timestamp = datetime.datetime.utcnow() -historyData = response.dish_get_history +stats, rl_stats = starlink_grpc.history_ping_stats(-1 if parse_all else samples, + verbose) -# 'current' is the count of data samples written to the ring buffer, -# irrespective of buffer wrap. -current = int(historyData.current) -nSamples = len(historyData.pop_ping_drop_rate) +if stats is None or rl_stats is None: + # verbose output already happened, so just bail. + sys.exit(1) -if fVerbose: - print("current: " + str(current)) - print("All samples: " + str(nSamples)) - -nSamples = min(nSamples,current) - -if fVerbose: - print("Valid samples: " + str(nSamples)) - -# This is ring buffer offset, so both index to oldest data sample and -# index to next data sample after the newest one. -offset = current % nSamples - -tot = 0 -totOne = 0 -totUnsched = 0 -totUnschedD = 0 -totUnschedOne = 0 -totObstruct = 0 -totObstructD = 0 -totObstructOne = 0 - -secondRuns = [0] * 60 -minuteRuns = [0] * 60 -runLength = 0 -initRun = None - -if fParseAll or nSamples < parseSamples: - parseSamples = nSamples - -# Parse the most recent parseSamples-sized set of samples. This will -# iterate samples in order from oldest to newest, although that's not -# actually required for the current set of stats being computed below. -if parseSamples <= offset: - sampleRange = range(offset - parseSamples, offset) +if verbose: + print("Parsed samples: " + str(stats["samples"])) + print("Total ping drop: " + str(stats["total_ping_drop"])) + print("Count of drop == 1: " + str(stats["count_full_ping_drop"])) + print("Obstructed: " + str(stats["count_obstructed"])) + print("Obstructed ping drop: " + str(stats["total_obstructed_ping_drop"])) + print("Obstructed drop == 1: " + str(stats["count_full_obstructed_ping_drop"])) + print("Unscheduled: " + str(stats["count_unscheduled"])) + print("Unscheduled ping drop: " + str(stats["total_unscheduled_ping_drop"])) + print("Unscheduled drop == 1: " + str(stats["count_full_unscheduled_ping_drop"])) + if run_lengths: + print("Initial drop run fragment: " + str(rl_stats["init_run_fragment"])) + print("Final drop run fragment: " + str(rl_stats["final_run_fragment"])) + print("Per-second drop runs: " + ", ".join(str(x) for x in rl_stats["run_seconds"])) + print("Per-minute drop runs: " + ", ".join(str(x) for x in rl_stats["run_minutes"])) else: - sampleRange = chain(range(nSamples + offset - parseSamples, nSamples), range(0, offset)) - -for i in sampleRange: - d = historyData.pop_ping_drop_rate[i] - tot += d - if d >= 1: - totOne += d - runLength += 1 - elif runLength > 0: - if initRun is None: - initRun = runLength - else: - if runLength <= 60: - secondRuns[runLength-1] += runLength + csv_data = [timestamp.replace(microsecond=0).isoformat()] + csv_data.extend(str(stats[field]) for field in fields) + if run_lengths: + for field in rl_fields: + if field.startswith("run_"): + csv_data.extend(str(substat) for substat in rl_stats[field]) else: - minuteRuns[min((runLength-1)//60-1, 59)] += runLength - runLength = 0 - elif initRun is None: - initRun = 0 - if not historyData.scheduled[i]: - totUnsched += 1 - totUnschedD += d - if d >= 1: - totUnschedOne += d - # scheduled=false and obstructed=true do not ever appear to overlap, - # but in case they do in the future, treat that as just unscheduled - # in order to avoid double-counting it. - elif historyData.obstructed[i]: - totObstruct += 1 - totObstructD += d - if d >= 1: - totObstructOne += d - -# If the entire sample set is one big drop run, it will be both initial -# fragment (continued from prior sample range) and final one (continued -# to next sample range), but to avoid double-reporting, just call it -# the initial run. -if initRun is None: - initRun = runLength - runLength = 0 - -if fVerbose: - print("Parsed samples: " + str(parseSamples)) - print("Total ping drop: " + str(tot)) - print("Count of drop == 1: " + str(totOne)) - print("Obstructed: " + str(totObstruct)) - print("Obstructed ping drop: " + str(totObstructD)) - print("Obstructed drop == 1: " + str(totObstructOne)) - print("Unscheduled: " + str(totUnsched)) - print("Unscheduled ping drop: " + str(totUnschedD)) - print("Unscheduled drop == 1: " + str(totUnschedOne)) - if fRunLengths: - print("Initial drop run fragment: " + str(initRun)) - print("Final drop run fragment: " + str(runLength)) - print("Per-second drop runs: " + ", ".join(str(x) for x in secondRuns)) - print("Per-minute drop runs: " + ", ".join(str(x) for x in minuteRuns)) -else: - # NOTE: When changing data output format, also change the -H header printing above. - csvData = timestamp.replace(microsecond=0).isoformat() + "," + ",".join(str(x) for x in [parseSamples, tot, totOne, totObstruct, totObstructD, totObstructOne, totUnsched, totUnschedD, totUnschedOne]) - if fRunLengths: - csvData += "," + ",".join(str(x) for x in chain([initRun, runLength], secondRuns, minuteRuns)) - print(csvData) + csv_data.append(str(rl_stats[field])) + print(",".join(csv_data)) diff --git a/parseJsonHistory.py b/parseJsonHistory.py index 0817914..9357847 100644 --- a/parseJsonHistory.py +++ b/parseJsonHistory.py @@ -13,181 +13,105 @@ # ###################################################################### -import json import datetime import sys import getopt -from itertools import chain +import starlink_json -fArgError = False +arg_error = False try: opts, args = getopt.getopt(sys.argv[1:], "ahrs:vH") except getopt.GetoptError as err: print(str(err)) - fArgError = True + arg_error = True # Default to 1 hour worth of data samples. -parseSamples = 3600 -fUsage = False -fVerbose = False -fParseAll = False -fHeader = False -fRunLengths = False +samples_default = 3600 +samples = samples_default +print_usage = False +verbose = False +parse_all = False +print_header = False +run_lengths = False -if not fArgError: +if not arg_error: if len(args) > 1: - fArgError = True + arg_error = True else: for opt, arg in opts: if opt == "-a": - fParseAll = True + parse_all = True elif opt == "-h": - fUsage = True + print_usage = True elif opt == "-r": - fRunLengths = True + run_lengths = True elif opt == "-s": - parseSamples = int(arg) + samples = int(arg) elif opt == "-v": - fVerbose = True + verbose = True elif opt == "-H": - fHeader = True + print_header = True -if fUsage or fArgError: - print("Usage: "+sys.argv[0]+" [options...] []") +if print_usage or arg_error: + print("Usage: " + sys.argv[0] + " [options...] []") print(" where is the file to parse, default: stdin") print("Options:") print(" -a: Parse all valid samples") print(" -h: Be helpful") print(" -r: Include ping drop run length stats") - print(" -s : Parse data samples, default: "+str(parseSamples)) + print(" -s : Parse data samples, default: " + str(samples_default)) print(" -v: Be verbose") print(" -H: print CSV header instead of parsing file") - sys.exit(1 if fArgError else 0) + sys.exit(1 if arg_error else 0) -if fHeader: - header = "datetimestamp_utc,samples,total_ping_drop,count_full_ping_drop,count_obstructed,total_obstructed_ping_drop,count_full_obstructed_ping_drop,count_unscheduled,total_unscheduled_ping_drop,count_full_unscheduled_ping_drop" - if fRunLengths: - header+= ",init_run_fragment,final_run_fragment," - header += ",".join("run_seconds_" + str(x) for x in range(1, 61)) + "," - header += ",".join("run_minutes_" + str(x) for x in range(1, 60)) - header += ",run_minutes_60_or_greater" - print(header) +fields, rl_fields = starlink_json.history_ping_field_names() + +if print_header: + header = ["datetimestamp_utc"] + header.extend(fields) + if run_lengths: + for field in rl_fields: + if field.startswith("run_"): + header.extend(field + "_" + str(x) for x in range(1, 61)) + else: + header.append(field) + print(",".join(header)) sys.exit(0) -# Allow "-" to be specified as file for stdin. -if len(args) == 0 or args[0] == "-": - jsonData = json.load(sys.stdin) -else: - jsonFile = open(args[0]) - jsonData = json.load(jsonFile) - jsonFile.close() - timestamp = datetime.datetime.utcnow() -historyData = jsonData['dishGetHistory'] +stats, rl_stats = starlink_json.history_ping_stats(args[0] if args else "-", + -1 if parse_all else samples, + verbose) -# 'current' is the count of data samples written to the ring buffer, -# irrespective of buffer wrap. -current = int(historyData['current']) -nSamples = len(historyData['popPingDropRate']) +if stats is None or rl_stats is None: + # verbose output already happened, so just bail. + sys.exit(1) -if fVerbose: - print("current: " + str(current)) - print("All samples: " + str(nSamples)) - -nSamples = min(nSamples,current) - -if fVerbose: - print("Valid samples: " + str(nSamples)) - -# This is ring buffer offset, so both index to oldest data sample and -# index to next data sample after the newest one. -offset = current % nSamples - -tot = 0 -totOne = 0 -totUnsched = 0 -totUnschedD = 0 -totUnschedOne = 0 -totObstruct = 0 -totObstructD = 0 -totObstructOne = 0 - -secondRuns = [0] * 60 -minuteRuns = [0] * 60 -runLength = 0 -initRun = None - -if fParseAll or nSamples < parseSamples: - parseSamples = nSamples - -# Parse the most recent parseSamples-sized set of samples. This will -# iterate samples in order from oldest to newest, although that's not -# actually required for the current set of stats being computed below. -if parseSamples <= offset: - sampleRange = range(offset - parseSamples, offset) +if verbose: + print("Parsed samples: " + str(stats["samples"])) + print("Total ping drop: " + str(stats["total_ping_drop"])) + print("Count of drop == 1: " + str(stats["count_full_ping_drop"])) + print("Obstructed: " + str(stats["count_obstructed"])) + print("Obstructed ping drop: " + str(stats["total_obstructed_ping_drop"])) + print("Obstructed drop == 1: " + str(stats["count_full_obstructed_ping_drop"])) + print("Unscheduled: " + str(stats["count_unscheduled"])) + print("Unscheduled ping drop: " + str(stats["total_unscheduled_ping_drop"])) + print("Unscheduled drop == 1: " + str(stats["count_full_unscheduled_ping_drop"])) + if run_lengths: + print("Initial drop run fragment: " + str(rl_stats["init_run_fragment"])) + print("Final drop run fragment: " + str(rl_stats["final_run_fragment"])) + print("Per-second drop runs: " + ", ".join(str(x) for x in rl_stats["run_seconds"])) + print("Per-minute drop runs: " + ", ".join(str(x) for x in rl_stats["run_minutes"])) else: - sampleRange = chain(range(nSamples + offset - parseSamples, nSamples), range(0, offset)) - -for i in sampleRange: - d = historyData["popPingDropRate"][i] - tot += d - if d >= 1: - totOne += d - runLength += 1 - elif runLength > 0: - if initRun is None: - initRun = runLength - else: - if runLength <= 60: - secondRuns[runLength-1] += runLength + csv_data = [timestamp.replace(microsecond=0).isoformat()] + csv_data.extend(str(stats[field]) for field in fields) + if run_lengths: + for field in rl_fields: + if field.startswith("run_"): + csv_data.extend(str(substat) for substat in rl_stats[field]) else: - minuteRuns[min((runLength-1)//60-1, 59)] += runLength - runLength = 0 - elif initRun is None: - initRun = 0 - if not historyData["scheduled"][i]: - totUnsched += 1 - totUnschedD += d - if d >= 1: - totUnschedOne += d - # scheduled=false and obstructed=true do not ever appear to overlap, - # but in case they do in the future, treat that as just unscheduled - # in order to avoid double-counting it. - elif historyData["obstructed"][i]: - totObstruct += 1 - totObstructD += d - if d >= 1: - totObstructOne += d - -# If the entire sample set is one big drop run, it will be both initial -# fragment (continued from prior sample range) and final one (continued -# to next sample range), but to avoid double-reporting, just call it -# the initial run. -if initRun is None: - initRun = runLength - runLength = 0 - -if fVerbose: - print("Parsed samples: " + str(parseSamples)) - print("Total ping drop: " + str(tot)) - print("Count of drop == 1: " + str(totOne)) - print("Obstructed: " + str(totObstruct)) - print("Obstructed ping drop: " + str(totObstructD)) - print("Obstructed drop == 1: " + str(totObstructOne)) - print("Unscheduled: " + str(totUnsched)) - print("Unscheduled ping drop: " + str(totUnschedD)) - print("Unscheduled drop == 1: " + str(totUnschedOne)) - if fRunLengths: - print("Initial drop run fragment: " + str(initRun)) - print("Final drop run fragment: " + str(runLength)) - print("Per-second drop runs: " + ", ".join(str(x) for x in secondRuns)) - print("Per-minute drop runs: " + ", ".join(str(x) for x in minuteRuns)) -else: - # NOTE: When changing data output format, also change the -H header printing above. - csvData = timestamp.replace(microsecond=0).isoformat() + "," + ",".join(str(x) for x in [parseSamples, tot, totOne, totObstruct, totObstructD, totObstructOne, totUnsched, totUnschedD, totUnschedOne]) - if fRunLengths: - csvData += "," + ",".join(str(x) for x in chain([initRun, runLength], secondRuns, minuteRuns)) - print(csvData) + csv_data.append(str(rl_stats[field])) + print(",".join(csv_data)) diff --git a/starlink_grpc.py b/starlink_grpc.py new file mode 100644 index 0000000..3401863 --- /dev/null +++ b/starlink_grpc.py @@ -0,0 +1,235 @@ +"""Helpers for grpc communication with a Starlink user terminal. + +This module may eventually contain more expansive parsing logic, but for now +it contains functions to parse the history data for some specific packet loss +statistics. + +General ping drop (packet loss) statistics: + This group of statistics characterize the packet loss (labeled "ping drop" + in the field names of the Starlink gRPC service protocol) in various ways. + + The sample interval is currently 1 second. + + samples: The number of valid samples analyzed. + total_ping_drop: The total amount of time, in sample intervals, that + experienced ping drop. + count_full_ping_drop: The number of samples that experienced 100% + ping drop. + count_obstructed: The number of samples that were marked as + "obstructed", regardless of whether they experienced any ping + drop. + total_obstructed_ping_drop: The total amount of time, in sample + intervals, that experienced ping drop in samples marked as + "obstructed". + count_full_obstructed_ping_drop: The number of samples that were + marked as "obstructed" and that experienced 100% ping drop. + count_unscheduled: The number of samples that were not marked as + "scheduled", regardless of whether they experienced any ping + drop. + total_unscheduled_ping_drop: The total amount of time, in sample + intervals, that experienced ping drop in samples not marked as + "scheduled". + count_full_unscheduled_ping_drop: The number of samples that were + not marked as "scheduled" and that experienced 100% ping drop. + + Total packet loss ratio can be computed with total_ping_drop / samples. + +Ping drop run length statistics: + This group of statistics characterizes packet loss by how long a + consecutive run of 100% packet loss lasts. + + init_run_fragment: The number of consecutive sample periods at the + start of the sample set that experienced 100% ping drop. This + period may be a continuation of a run that started prior to the + sample set, so is not counted in the following stats. + final_run_fragment: The number of consecutive sample periods at the + end of the sample set that experienced 100% ping drop. This + period may continue as a run beyond the end of the sample set, so + is not counted in the following stats. + run_seconds: A 60 element list. Each element records the total amount + of time, in sample intervals, that experienced 100% ping drop in + a consecutive run that lasted for (list index + 1) sample + intervals (seconds). That is, the first element contains time + spent in 1 sample runs, the second element contains time spent in + 2 sample runs, etc. + run_minutes: A 60 element list. Each element records the total amount + of time, in sample intervals, that experienced 100% ping drop in + a consecutive run that lasted for more that (list index + 1) + multiples of 60 sample intervals (minutes), but less than or equal + to (list index + 2) multiples of 60 sample intervals. Except for + the last element in the list, which records the total amount of + time in runs of more than 60*60 samples. + + No sample should be counted in more than one of the run length stats or + stat elements, so the total of all of them should be equal to + count_full_ping_drop from the general stats. + + Samples that experience less than 100% ping drop are not counted in this + group of stats, even if they happen at the beginning or end of a run of + 100% ping drop samples. To compute the amount of time that experienced + ping loss in less than a single run of 100% ping drop, use + (total_ping_drop - count_full_ping_drop) from the general stats. +""" + +from itertools import chain + +import grpc + +import spacex.api.device.device_pb2 +import spacex.api.device.device_pb2_grpc + +def history_ping_field_names(): + """Return the field names of the packet loss stats. + + Returns: + A tuple with 2 lists, the first with general stat names and the + second with ping drop run length stat names. + """ + return [ + "samples", + "total_ping_drop", + "count_full_ping_drop", + "count_obstructed", + "total_obstructed_ping_drop", + "count_full_obstructed_ping_drop", + "count_unscheduled", + "total_unscheduled_ping_drop", + "count_full_unscheduled_ping_drop" + ], [ + "init_run_fragment", + "final_run_fragment", + "run_seconds", + "run_minutes" + ] + +def get_history(): + """Fetch history data and return it in grpc structure format. + + Raises: + grpc.RpcError: Communication or service error. + """ + with grpc.insecure_channel("192.168.100.1:9200") as channel: + stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) + response = stub.Handle(spacex.api.device.device_pb2.Request(get_history={})) + return response.dish_get_history + +def history_ping_stats(parse_samples, verbose=False): + """Fetch, parse, and compute the packet loss stats. + + Args: + parse_samples (int): Number of samples to process, or -1 to parse all + available samples. + verbose (bool): Optionally produce verbose output. + + Returns: + On success, a tuple with 2 dicts, the first mapping general stat names + to their values and the second mapping ping drop run length stat names + to their values. + + On failure, the tuple (None, None). + """ + try: + history = get_history() + except grpc.RpcError: + if verbose: + # RpcError is too verbose to print the details. + print("Failed getting history") + return None, None + + # 'current' is the count of data samples written to the ring buffer, + # irrespective of buffer wrap. + current = int(history.current) + samples = len(history.pop_ping_drop_rate) + + if verbose: + print("current counter: " + str(current)) + print("All samples: " + str(samples)) + + samples = min(samples, current) + + if verbose: + print("Valid samples: " + str(samples)) + + # This is ring buffer offset, so both index to oldest data sample and + # index to next data sample after the newest one. + offset = current % samples + + tot = 0 + count_full_drop = 0 + count_unsched = 0 + total_unsched_drop = 0 + count_full_unsched = 0 + count_obstruct = 0 + total_obstruct_drop = 0 + count_full_obstruct = 0 + + second_runs = [0] * 60 + minute_runs = [0] * 60 + run_length = 0 + init_run_length = None + + if parse_samples < 0 or samples < parse_samples: + parse_samples = samples + + # Parse the most recent parse_samples-sized set of samples. This will + # iterate samples in order from oldest to newest. + if parse_samples <= offset: + sample_range = range(offset - parse_samples, offset) + else: + sample_range = chain(range(samples + offset - parse_samples, samples), range(0, offset)) + + for i in sample_range: + d = history.pop_ping_drop_rate[i] + tot += d + if d >= 1: + count_full_drop += d + run_length += 1 + elif run_length > 0: + if init_run_length is None: + init_run_length = run_length + else: + if run_length <= 60: + second_runs[run_length - 1] += run_length + else: + minute_runs[min((run_length - 1)//60 - 1, 59)] += run_length + run_length = 0 + elif init_run_length is None: + init_run_length = 0 + if not history.scheduled[i]: + count_unsched += 1 + total_unsched_drop += d + if d >= 1: + count_full_unsched += d + # scheduled=false and obstructed=true do not ever appear to overlap, + # but in case they do in the future, treat that as just unscheduled + # in order to avoid double-counting it. + elif history.obstructed[i]: + count_obstruct += 1 + total_obstruct_drop += d + if d >= 1: + count_full_obstruct += d + + # If the entire sample set is one big drop run, it will be both initial + # fragment (continued from prior sample range) and final one (continued + # to next sample range), but to avoid double-reporting, just call it + # the initial run. + if init_run_length is None: + init_run_length = run_length + run_length = 0 + + return { + "samples": parse_samples, + "total_ping_drop": tot, + "count_full_ping_drop": count_full_drop, + "count_obstructed": count_obstruct, + "total_obstructed_ping_drop": total_obstruct_drop, + "count_full_obstructed_ping_drop": count_full_obstruct, + "count_unscheduled": count_unsched, + "total_unscheduled_ping_drop": total_unsched_drop, + "count_full_unscheduled_ping_drop": count_full_unsched + }, { + "init_run_fragment": init_run_length, + "final_run_fragment": run_length, + "run_seconds": second_runs, + "run_minutes": minute_runs + } diff --git a/starlink_json.py b/starlink_json.py new file mode 100644 index 0000000..ca70547 --- /dev/null +++ b/starlink_json.py @@ -0,0 +1,178 @@ +"""Parser for JSON format gRPC output from a Starlink user terminal. + +Expects input as from grpcurl get_history request. + +Handling output for other request responses may be added in the future, but +the others don't really need as much interpretation as the get_history +response does. + +See the starlink_grpc module docstring for descriptions of the stat elements. +""" + +import json +import sys + +from itertools import chain + +def history_ping_field_names(): + """Return the field names of the packet loss stats. + + Returns: + A tuple with 2 lists, the first with general stat names and the + second with ping drop run length stat names. + """ + return [ + "samples", + "total_ping_drop", + "count_full_ping_drop", + "count_obstructed", + "total_obstructed_ping_drop", + "count_full_obstructed_ping_drop", + "count_unscheduled", + "total_unscheduled_ping_drop", + "count_full_unscheduled_ping_drop" + ], [ + "init_run_fragment", + "final_run_fragment", + "run_seconds", + "run_minutes" + ] + +def get_history(filename): + """Read JSON data and return the raw history in dict format. + + Args: + filename (str): Filename from which to read JSON data, or "-" to read + from standard input. + """ + if filename == "-": + json_data = json.load(sys.stdin) + else: + json_file = open(filename) + try: + json_data = json.load(json_file) + finally: + json_file.close() + return json_data["dishGetHistory"] + +def history_ping_stats(filename, parse_samples, verbose=False): + """Fetch, parse, and compute the packet loss stats. + + Args: + filename (str): Filename from which to read JSON data, or "-" to read + from standard input. + parse_samples (int): Number of samples to process, or -1 to parse all + available samples. + verbose (bool): Optionally produce verbose output. + + Returns: + On success, a tuple with 2 dicts, the first mapping general stat names + to their values and the second mapping ping drop run length stat names + to their values. + + On failure, the tuple (None, None). + """ + try: + history = get_history(filename) + except Exception as e: + if verbose: + print("Failed getting history: " + str(e)) + return None, None + + # "current" is the count of data samples written to the ring buffer, + # irrespective of buffer wrap. + current = int(history["current"]) + samples = len(history["popPingDropRate"]) + + if verbose: + print("current counter: " + str(current)) + print("All samples: " + str(samples)) + + samples = min(samples, current) + + if verbose: + print("Valid samples: " + str(samples)) + + # This is ring buffer offset, so both index to oldest data sample and + # index to next data sample after the newest one. + offset = current % samples + + tot = 0 + count_full_drop = 0 + count_unsched = 0 + total_unsched_drop = 0 + count_full_unsched = 0 + count_obstruct = 0 + total_obstruct_drop = 0 + count_full_obstruct = 0 + + second_runs = [0] * 60 + minute_runs = [0] * 60 + run_length = 0 + init_run_length = None + + if parse_samples < 0 or samples < parse_samples: + parse_samples = samples + + # Parse the most recent parse_samples-sized set of samples. This will + # iterate samples in order from oldest to newest. + if parse_samples <= offset: + sample_range = range(offset - parse_samples, offset) + else: + sample_range = chain(range(samples + offset - parse_samples, samples), range(0, offset)) + + for i in sample_range: + d = history["popPingDropRate"][i] + tot += d + if d >= 1: + count_full_drop += d + run_length += 1 + elif run_length > 0: + if init_run_length is None: + init_run_length = run_length + else: + if run_length <= 60: + second_runs[run_length - 1] += run_length + else: + minute_runs[min((run_length - 1)//60 - 1, 59)] += run_length + run_length = 0 + elif init_run_length is None: + init_run_length = 0 + if not history["scheduled"][i]: + count_unsched += 1 + total_unsched_drop += d + if d >= 1: + count_full_unsched += d + # scheduled=false and obstructed=true do not ever appear to overlap, + # but in case they do in the future, treat that as just unscheduled + # in order to avoid double-counting it. + elif history["obstructed"][i]: + count_obstruct += 1 + total_obstruct_drop += d + if d >= 1: + count_full_obstruct += d + + # If the entire sample set is one big drop run, it will be both initial + # fragment (continued from prior sample range) and final one (continued + # to next sample range), but to avoid double-reporting, just call it + # the initial run. + if init_run_length is None: + init_run_length = run_length + run_length = 0 + + return { + "samples": parse_samples, + "total_ping_drop": tot, + "count_full_ping_drop": count_full_drop, + "count_obstructed": count_obstruct, + "total_obstructed_ping_drop": total_obstruct_drop, + "count_full_obstructed_ping_drop": count_full_obstruct, + "count_unscheduled": count_unsched, + "total_unscheduled_ping_drop": total_unsched_drop, + "count_full_unscheduled_ping_drop": count_full_unsched + }, { + "init_run_fragment": init_run_length, + "final_run_fragment": run_length, + "run_seconds": second_runs, + "run_minutes": minute_runs + }