Implement sample counter tracking in bulk mode

Add tracking of exactly which samples have already been sent off to InfluxDB so that samples are neither missed nor repeated due to minor time deltas in OS task scheduling. For now, this is only being applied to bulk mode.

Make the -s option only apply to the first loop iteration for bulk mode, since subsequent loops will want to pick up all samples since prior iteration.

Also, omit the latency field from the data point sent to InfluxDB for samples where the ping drop is 100%. The raw history data apparently just repeats prior value in this case, probably because it cannot just leave a hole in the data array and there is no good way to indicate invalid.

Related to issue #5
This commit is contained in:
sparky8512 2021-01-18 13:30:34 -08:00
parent 0663008be7
commit 9b96c5dcc6
2 changed files with 154 additions and 49 deletions

View file

@ -1,17 +1,25 @@
#!/usr/bin/python3 #!/usr/bin/python3
###################################################################### ######################################################################
# #
# Write Starlink user terminal packet loss statistics to an InfluxDB # Write Starlink user terminal packet loss, latency, and usage data
# database. # to an InfluxDB database.
# #
# This script examines the most recent samples from the history data, # This script examines the most recent samples from the history data,
# computes several different metrics related to packet loss, and # and either writes them in whole, or computes several different
# writes those to the specified InfluxDB database. # metrics related to packet loss and writes those, to the specified
# InfluxDB database.
#
# NOTE: The Starlink user terminal does not include time values with
# its history or status data, so this script uses current system time
# to compute the timestamps it sends to InfluxDB. It is recommended
# to run this script on a host that has its system clock synced via
# NTP. Otherwise, the timestamps may get out of sync with real time.
# #
###################################################################### ######################################################################
import getopt import getopt
import datetime from datetime import datetime
from datetime import timezone
import logging import logging
import os import os
import signal import signal
@ -140,8 +148,9 @@ def main():
print(" -n <name>: Hostname of InfluxDB server, default: " + host_default) print(" -n <name>: Hostname of InfluxDB server, default: " + host_default)
print(" -p <num>: Port number to use on InfluxDB server") print(" -p <num>: Port number to use on InfluxDB server")
print(" -r: Include ping drop run length stats") print(" -r: Include ping drop run length stats")
print(" -s <num>: Number of data samples to parse, default: loop interval,") print(" -s <num>: Number of data samples to parse; in bulk mode, applies to first")
print(" if set, else " + str(samples_default)) print(" loop iteration only, default: loop interval, if set, else " +
str(samples_default))
print(" -t <num>: Loop interval in seconds or 0 for no loop, default: " + print(" -t <num>: Loop interval in seconds or 0 for no loop, default: " +
str(default_loop_time)) str(default_loop_time))
print(" -v: Be verbose") print(" -v: Be verbose")
@ -165,6 +174,8 @@ def main():
gstate = GlobalState() gstate = GlobalState()
gstate.dish_id = None gstate.dish_id = None
gstate.points = [] gstate.points = []
gstate.counter = None
gstate.timestamp = None
def conn_error(msg, *args): def conn_error(msg, *args):
# Connection errors that happen in an interval loop are not critical # Connection errors that happen in an interval loop are not critical
@ -187,23 +198,39 @@ def main():
return 0 return 0
def process_bulk_data(): def process_bulk_data():
timestamp = datetime.datetime.utcnow() # need to pull this now in case it is needed later
now = time.time()
general, bulk = starlink_grpc.history_bulk_data(samples, verbose) start = gstate.counter
parse_samples = samples if start is None else -1
general, bulk = starlink_grpc.history_bulk_data(parse_samples, start=start, verbose=verbose)
parsed_samples = general["samples"] parsed_samples = general["samples"]
new_counter = general["current"]
timestamp = gstate.timestamp
if timestamp is None or new_counter != gstate.counter + parsed_samples:
timestamp = now
if verbose:
print("Establishing new time base: " + str(new_counter) + " -> " +
str(datetime.fromtimestamp(timestamp, tz=timezone.utc)))
timestamp -= parsed_samples
for i in range(parsed_samples): for i in range(parsed_samples):
gstate.points.append({ gstate.points.append({
"measurement": "spacex.starlink.user_terminal.history", "measurement": "spacex.starlink.user_terminal.history",
"tags": { "tags": {
"id": gstate.dish_id "id": gstate.dish_id
}, },
"time": timestamp + datetime.timedelta(seconds=i - parsed_samples), "time": datetime.utcfromtimestamp(timestamp),
"fields": {k: v[i] for k, v in bulk.items()}, "fields": {k: v[i] for k, v in bulk.items() if v[i] is not None},
}) })
timestamp += 1
gstate.counter = new_counter
gstate.timestamp = timestamp
def process_ping_stats(): def process_ping_stats():
timestamp = datetime.datetime.utcnow() timestamp = time.time()
general, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose) general, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose)
@ -222,7 +249,7 @@ def main():
"tags": { "tags": {
"id": gstate.dish_id "id": gstate.dish_id
}, },
"time": timestamp, "time": datetime.utcfromtimestamp(timestamp),
"fields": all_stats, "fields": all_stats,
}) })

View file

@ -1,20 +1,55 @@
"""Helpers for grpc communication with a Starlink user terminal. """Helpers for grpc communication with a Starlink user terminal.
This module may eventually contain more expansive parsing logic, but for now This module may eventually contain more expansive parsing logic, but for now
it contains functions to parse the history data for some specific packet loss it contains functions to either get the history data as-is or parse it for
statistics. some specific packet loss statistics.
General statistics: Those functions return data grouped into sets, as follows:
This group of statistics contains data relevant to all the other groups.
General data:
This set of fields contains data relevant to all the other groups.
The sample interval is currently 1 second. The sample interval is currently 1 second.
samples: The number of valid samples analyzed. samples: The number of samples analyzed (for statistics) or returned
current: XXX explain (for bulk data).
current: The total number of data samples that have been written to
the history buffer since dish reboot, irrespective of buffer wrap.
This can be used to keep track of how many samples are new in
comparison to a prior query of the history data.
Bulk history data: Bulk history data:
XXX to be written, but it'll be same as some of the items in status info, This group holds the history data as-is for the requested range of
just as lists for each. samples, just unwound from the circular buffers that the raw data holds.
It contains some of the same fields as the status info, but instead of
representing the current values, each field contains a sequence of values
representing the value over time, ending at the current time.
pop_ping_drop_rate: Fraction of lost ping replies per sample.
pop_ping_latency_ms: Round trip time, in milliseconds, during the
sample period, or None if a sample experienced 100% ping drop.
downlink_throughput_bps: Download usage during the sample period
(actual, not max available), in bits per second.
uplink_throughput_bps: Upload usage during the sample period, in bits
per second.
snr: Signal to noise ratio during the sample period.
scheduled: Boolean indicating whether or not a satellite was scheduled
to be available for transmit/receive during the sample period.
When false, ping drop shows as "No satellites" in Starlink app.
obstructed: Boolean indicating whether or not the dish determined the
signal between it and the satellite was obstructed during the
sample period. When true, ping drop shows as "Obstructed" in the
Starlink app.
There is no specific data field in the raw history data that directly
correlates with "Other" or "Beta downtime" in the Starlink app (or
whatever it gets renamed to after beta), but empirical evidence suggests
any sample where pop_ping_drop_rate is 1, scheduled is true, and
obstructed is false is counted as "Beta downtime".
Note that neither scheduled=false nor obstructed=true necessarily means
packet loss occurred. Those need to be examined in combination with
pop_ping_drop_rate to be meaningful.
General ping drop (packet loss) statistics: General ping drop (packet loss) statistics:
This group of statistics characterize the packet loss (labeled "ping drop" This group of statistics characterize the packet loss (labeled "ping drop"
@ -55,18 +90,18 @@ Ping drop run length statistics:
end of the sample set that experienced 100% ping drop. This end of the sample set that experienced 100% ping drop. This
period may continue as a run beyond the end of the sample set, so period may continue as a run beyond the end of the sample set, so
is not counted in the following stats. is not counted in the following stats.
run_seconds: A 60 element list. Each element records the total amount run_seconds: A 60 element sequence. Each element records the total
of time, in sample intervals, that experienced 100% ping drop in amount of time, in sample intervals, that experienced 100% ping
a consecutive run that lasted for (list index + 1) sample drop in a consecutive run that lasted for (index + 1) sample
intervals (seconds). That is, the first element contains time intervals (seconds). That is, the first element contains time
spent in 1 sample runs, the second element contains time spent in spent in 1 sample runs, the second element contains time spent in
2 sample runs, etc. 2 sample runs, etc.
run_minutes: A 60 element list. Each element records the total amount run_minutes: A 60 element sequence. Each element records the total
of time, in sample intervals, that experienced 100% ping drop in amount of time, in sample intervals, that experienced 100% ping
a consecutive run that lasted for more that (list index + 1) drop in a consecutive run that lasted for more that (index + 1)
multiples of 60 sample intervals (minutes), but less than or equal multiples of 60 sample intervals (minutes), but less than or equal
to (list index + 2) multiples of 60 sample intervals. Except for to (index + 2) multiples of 60 sample intervals. Except for the
the last element in the list, which records the total amount of last element in the sequence, which records the total amount of
time in runs of more than 60*60 samples. time in runs of more than 60*60 samples.
No sample should be counted in more than one of the run length stats or No sample should be counted in more than one of the run length stats or
@ -135,7 +170,7 @@ def history_ping_field_names():
"""Return the field names of the packet loss stats. """Return the field names of the packet loss stats.
Returns: Returns:
A tuple with 3 lists, the first with general stat names, the second A tuple with 3 lists, the first with general data names, the second
with ping drop stat names, and the third with ping drop run length with ping drop stat names, and the third with ping drop run length
stat names. stat names.
""" """
@ -171,9 +206,7 @@ def get_history():
return response.dish_get_history return response.dish_get_history
def compute_sample_range(history, parse_samples, verbose=False): def _compute_sample_range(history, parse_samples, start=None, verbose=False):
# 'current' is the count of data samples written to the ring buffer,
# irrespective of buffer wrap.
current = int(history.current) current = int(history.current)
samples = len(history.pop_ping_drop_rate) samples = len(history.pop_ping_drop_rate)
@ -186,30 +219,72 @@ def compute_sample_range(history, parse_samples, verbose=False):
if verbose: if verbose:
print("Valid samples: " + str(samples)) print("Valid samples: " + str(samples))
# This is ring buffer offset, so both index to oldest data sample and
# index to next data sample after the newest one.
offset = current % samples
if parse_samples < 0 or samples < parse_samples: if parse_samples < 0 or samples < parse_samples:
parse_samples = samples parse_samples = samples
# Parse the most recent parse_samples-sized set of samples. This will if start is not None and start > current:
# iterate samples in order from oldest to newest. if verbose:
if parse_samples <= offset: print("Counter reset detected, ignoring requested start count")
sample_range = range(offset - parse_samples, offset) start = None
if start is None or start < current - parse_samples:
start = current - parse_samples
# This is ring buffer offset, so both index to oldest data sample and
# index to next data sample after the newest one.
end_offset = current % samples
start_offset = start % samples
# Set the range for the requested set of samples. This will iterate
# sample index in order from oldest to newest.
if start_offset < end_offset:
sample_range = range(start_offset, end_offset)
else: else:
sample_range = chain(range(samples + offset - parse_samples, samples), range(0, offset)) sample_range = chain(range(start_offset, samples), range(0, end_offset))
return sample_range, parse_samples, current return sample_range, current - start, current
def history_bulk_data(parse_samples, verbose=False): def history_bulk_data(parse_samples, start=None, verbose=False):
"""Fetch history data for a range of samples.
Args:
parse_samples (int): Number of samples to process, or -1 to parse all
available samples (bounded by start, if it is set).
start (int): Optional. If set, the samples returned will be limited to
the ones that have a counter value greater than or equal to this
value. The "current" field in the general data dict returned by
this function represents the counter value of the next data sample
after the returned data, so if that value is passed as start in a
subsequent call to this function, only new samples will be
returned.
NOTE: The sample counter will reset to 0 when the dish reboots. If
the requested start value is greater than the current "current"
value, this function will assume that happened and treat all
samples as being later than the requested start, and thus include
them (bounded by parse_samples, if it is not -1).
Combining parse_samples=-1 and setting start to other than None is
not recommended, as doing so will not guarantee that all new
samples are included in the results.
verbose (bool): Optionally produce verbose output.
Returns:
A tuple with 2 dicts, the first mapping general data names to their
values and the second mapping bulk history data names to their values.
Raises:
GrpcError: Failed getting history info from the Starlink user
terminal.
"""
try: try:
history = get_history() history = get_history()
except grpc.RpcError as e: except grpc.RpcError as e:
raise GrpcError(e) raise GrpcError(e)
sample_range, parse_samples, current = compute_sample_range(history, parse_samples, verbose) sample_range, parsed_samples, current = _compute_sample_range(history,
parse_samples,
start=start,
verbose=verbose)
pop_ping_drop_rate = [] pop_ping_drop_rate = []
pop_ping_latency_ms = [] pop_ping_latency_ms = []
@ -221,7 +296,8 @@ def history_bulk_data(parse_samples, verbose=False):
for i in sample_range: for i in sample_range:
pop_ping_drop_rate.append(history.pop_ping_drop_rate[i]) pop_ping_drop_rate.append(history.pop_ping_drop_rate[i])
pop_ping_latency_ms.append(history.pop_ping_latency_ms[i]) pop_ping_latency_ms.append(
history.pop_ping_latency_ms[i] if history.pop_ping_drop_rate[i] < 1 else None)
downlink_throughput_bps.append(history.downlink_throughput_bps[i]) downlink_throughput_bps.append(history.downlink_throughput_bps[i])
uplink_throughput_bps.append(history.uplink_throughput_bps[i]) uplink_throughput_bps.append(history.uplink_throughput_bps[i])
snr.append(history.snr[i]) snr.append(history.snr[i])
@ -229,7 +305,7 @@ def history_bulk_data(parse_samples, verbose=False):
obstructed.append(history.obstructed[i]) obstructed.append(history.obstructed[i])
return { return {
"samples": parse_samples, "samples": parsed_samples,
"current": current, "current": current,
}, { }, {
"pop_ping_drop_rate": pop_ping_drop_rate, "pop_ping_drop_rate": pop_ping_drop_rate,
@ -251,7 +327,7 @@ def history_ping_stats(parse_samples, verbose=False):
verbose (bool): Optionally produce verbose output. verbose (bool): Optionally produce verbose output.
Returns: Returns:
A tuple with 3 dicts, the first mapping general stat names to their A tuple with 3 dicts, the first mapping general data names to their
values, the second mapping ping drop stat names to their values and values, the second mapping ping drop stat names to their values and
the third mapping ping drop run length stat names to their values. the third mapping ping drop run length stat names to their values.
@ -264,7 +340,9 @@ def history_ping_stats(parse_samples, verbose=False):
except grpc.RpcError as e: except grpc.RpcError as e:
raise GrpcError(e) raise GrpcError(e)
sample_range, parse_samples, current = compute_sample_range(history, parse_samples, verbose) sample_range, parse_samples, current = _compute_sample_range(history,
parse_samples,
verbose=verbose)
tot = 0.0 tot = 0.0
count_full_drop = 0 count_full_drop = 0