Improvements to how the -o option works

Change the loop polling function (-o) to aggregate the history data each polling loop instead of just keeping the last polled history so it can be logged when reboot is detected. This allows for computing statistics across a longer period than the size of the dish's history buffer, which has been reduced to 15 minutes recently.

This change also makes it so data is not logged right away when dish reboot is detected, so the logging always happens at the specified interval whether there was a reboot or not.

Finally, change the poll loop counting so data is not emitted on the first loop when polling is configured. That made sense to do when the history buffer was large enough to have the entire period's worth of data, but now it just results in a short period in the log output every time the script is restarted.

Fixes #29
This commit is contained in:
sparky8512 2021-09-07 12:02:14 -07:00
parent 41caa76962
commit af940a9727
3 changed files with 108 additions and 27 deletions

View file

@ -80,6 +80,12 @@ Some of the scripts (currently only the InfluxDB one) also support specifying op
A recent (as of 2021-Aug) change in the dish firmware appears to have reduced the amount of history data returned from the most recent 12 hours to the most recent 15 minutes, so if you are using the `-t` option to poll either bulk history or history-based statistics, you should choose an interval less than 900 seconds; otherwise, you will not capture all the data. A recent (as of 2021-Aug) change in the dish firmware appears to have reduced the amount of history data returned from the most recent 12 hours to the most recent 15 minutes, so if you are using the `-t` option to poll either bulk history or history-based statistics, you should choose an interval less than 900 seconds; otherwise, you will not capture all the data.
Computing history statistics (one or more of groups `ping_drop`, `ping_run_length`, `ping_latency`, `ping_loaded_latency`, and `usage`) across periods longer than the 15 minute history buffer may be done by combining the `-t` and `-o` options. The history data will be polled at the interval specified by the `-t` option, but it will be aggregated the number of times specified by the `-o` option and statistics will be computed against the aggregated data which will be a period of the `-t` option value times the `-o` option value. For example, the following:
```shell script
python3 dish_grpc_text.py -t 60 -o 60 ping_drop
```
will poll history data once per minute, but compute statistics only once per hour. This also reduces data loss due to a dish reboot, since the `-o` option will aggregate across reboots, too.
### The JSON parser script ### The JSON parser script
`dish_json_text.py` is similar to `dish_grpc_text.py`, but it takes JSON format input from a file instead of pulling it directly from the dish via grpc call. It also does not support the status info modes, because those are easy enough to interpret directly from the JSON data. The easiest way to use it is to pipe the `grpcurl` command directly into it. For example: `dish_json_text.py` is similar to `dish_grpc_text.py`, but it takes JSON format input from a file instead of pulling it directly from the dish via grpc call. It also does not support the status info modes, because those are easy enough to interpret directly from the JSON data. The easiest way to use it is to pipe the `grpcurl` command directly into it. For example:

View file

@ -72,9 +72,9 @@ def create_arg_parser(output_description, bulk_history=True):
group.add_argument("-o", group.add_argument("-o",
"--poll-loops", "--poll-loops",
type=int, type=int,
help="Poll history for N loops or until reboot detected, before computing " help="Poll history for N loops and aggregate data before computing history "
"history stats; this allows for a smaller loop interval with less loss of " "stats; this allows for a smaller loop interval with less loss of data "
"data when the dish reboots", "when the dish reboots",
metavar="N") metavar="N")
if bulk_history: if bulk_history:
sample_help = ("Number of data samples to parse; normally applies to first loop " sample_help = ("Number of data samples to parse; normally applies to first loop "
@ -163,7 +163,7 @@ class GlobalState:
self.dish_id = None self.dish_id = None
self.context = starlink_grpc.ChannelContext(target=target) self.context = starlink_grpc.ChannelContext(target=target)
self.poll_count = 0 self.poll_count = 0
self.prev_history = None self.accum_history = None
def shutdown(self): def shutdown(self):
self.context.close() self.context.close()
@ -270,33 +270,35 @@ def get_history_stats(opts, gstate, add_item, add_sequence):
history = starlink_grpc.get_history(context=gstate.context) history = starlink_grpc.get_history(context=gstate.context)
except grpc.RpcError as e: except grpc.RpcError as e:
conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e))) conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e)))
history = gstate.prev_history history = None
if history is None:
return 1
if history and gstate.prev_history and history.current < gstate.prev_history.current: # Accumulate polled history data into gstate.accum_history, even if there
if opts.verbose: # was a dish reboot.
print("Dish reboot detected. Restarting loop polling count.") if gstate.accum_history:
# process saved history data and keep the new data for next time if history is not None:
history, gstate.prev_history = gstate.prev_history, history gstate.accum_history = starlink_grpc.concatenate_history(gstate.accum_history,
# the newly saved data counts as a loop, so advance 1 past reset point history,
gstate.poll_count = opts.poll_loops - 2 verbose=opts.verbose)
elif gstate.poll_count > 0:
gstate.poll_count -= 1
gstate.prev_history = history
return
else: else:
# if no --poll-loops option set, opts.poll_loops gets set to 1, so gstate.accum_history = history
# poll_count will always be 0 and prev_history will always be None
gstate.prev_history = None if gstate.poll_count < opts.poll_loops - 1:
gstate.poll_count = opts.poll_loops - 1 gstate.poll_count += 1
return 0
# This can happen if all polling attempts failed. Verbose output has
# already happened, so just return.
if gstate.accum_history is None:
return 1
gstate.poll_count = 0
start = gstate.counter_stats start = gstate.counter_stats
parse_samples = opts.samples if start is None else -1 parse_samples = opts.samples if start is None else -1
groups = starlink_grpc.history_stats(parse_samples, groups = starlink_grpc.history_stats(parse_samples,
start=start, start=start,
verbose=opts.verbose, verbose=opts.verbose,
history=history) history=gstate.accum_history)
general, ping, runlen, latency, loaded, usage = groups[0:6] general, ping, runlen, latency, loaded, usage = groups[0:6]
add_data = add_data_numeric if opts.numeric else add_data_normal add_data = add_data_numeric if opts.numeric else add_data_normal
add_data(general, "ping_stats", add_item, add_sequence) add_data(general, "ping_stats", add_item, add_sequence)
@ -313,6 +315,8 @@ def get_history_stats(opts, gstate, add_item, add_sequence):
if not opts.no_counter: if not opts.no_counter:
gstate.counter_stats = general["end_counter"] gstate.counter_stats = general["end_counter"]
gstate.accum_history = None
return 0 return 0

View file

@ -60,12 +60,14 @@ This group holds information about the current state of the user terminal.
it and the satellites with which it communicates. it and the satellites with which it communicates.
: **currently_obstructed** : Most recent sample value. See bulk history data : **currently_obstructed** : Most recent sample value. See bulk history data
for detail. for detail.
: **seconds_obstructed** : The amount of time within the history buffer : **seconds_obstructed** : The amount of time within the history buffer,
(currently the smaller of 12 hours or since last reboot), in seconds that in seconds, that the user terminal determined to be obstructed, regardless
the user terminal determined to be obstructed, regardless of whether or of whether or not packets were able to be transmitted or received. See also
not packets were able to be transmitted or received. See also
*count_obstructed* in general ping drop history data; this value will be *count_obstructed* in general ping drop history data; this value will be
equal to that value when computed across all available history samples. equal to that value when computed across all available history samples.
NOTE: The history buffer is now much smaller than it used to be, so this
field is probably either not very useful, or may be computed differently
by the user terminal than described above.
: **obstruction_duration** : Average consecutive time, in seconds, the user : **obstruction_duration** : Average consecutive time, in seconds, the user
terminal has detected its signal to be obstructed for a period of time terminal has detected its signal to be obstructed for a period of time
that it considers "prolonged", or None if no such obstructions were that it considers "prolonged", or None if no such obstructions were
@ -335,6 +337,9 @@ from spacex.api.device import device_pb2
from spacex.api.device import device_pb2_grpc from spacex.api.device import device_pb2_grpc
from spacex.api.device import dish_pb2 from spacex.api.device import dish_pb2
HISTORY_FIELDS = ("pop_ping_drop_rate", "pop_ping_latency_ms", "downlink_throughput_bps",
"uplink_throughput_bps", "snr", "scheduled", "obstructed")
def resolve_imports(channel): def resolve_imports(channel):
importer.resolve_lazy_imports(channel) importer.resolve_lazy_imports(channel)
@ -356,6 +361,10 @@ class GrpcError(Exception):
super().__init__(msg, *args, **kwargs) super().__init__(msg, *args, **kwargs)
class UnwrappedHistory:
"""Empty class for holding a copy of grpc history data."""
class ChannelContext: class ChannelContext:
"""A wrapper for reusing an open grpc Channel across calls. """A wrapper for reusing an open grpc Channel across calls.
@ -817,6 +826,10 @@ def _compute_sample_range(history, parse_samples, start=None, verbose=False):
if start == current: if start == current:
return range(0), 0, current return range(0), 0, current
# Not a ring buffer is simple case.
if hasattr(history, "unwrapped"):
return range(samples - (current-start), samples), current - start, current
# This is ring buffer offset, so both index to oldest data sample and # This is ring buffer offset, so both index to oldest data sample and
# index to next data sample after the newest one. # index to next data sample after the newest one.
end_offset = current % samples end_offset = current % samples
@ -832,6 +845,64 @@ def _compute_sample_range(history, parse_samples, start=None, verbose=False):
return sample_range, current - start, current return sample_range, current - start, current
def concatenate_history(history1, history2, verbose=False):
""" Append the sample-dependent fields of one history object to another.
Note:
Samples data will be appended regardless of dish reboot or history
data ring buffer wrap, which may result in discontiguous sample data
with lost data.
Args:
history1: The grpc history object, such as one returned by a prior
call to `get_history`, or equivalent dict, to which to append.
history2: The grpc history object, such as one returned by a prior
call to `get_history`, from which to append.
verbose (bool): Optionally produce verbose output.
Returns:
An object with the unwrapped history data and the same attribute
fields as a grpc history object.
"""
size2 = len(history2.pop_ping_drop_rate)
new_samples = history2.current - history1.current
if new_samples < 0:
if verbose:
print("Dish reboot detected. Appending anyway.")
new_samples = history2.current if history2.current < size2 else size2
elif new_samples > size2:
# This should probably go to stderr and not depend on verbose flag,
# but this layer of the code tries not to make that sort of logging
# policy decision, so honor requested verbosity.
if verbose:
print("WARNING: Appending discontiguous samples. Polling interval probably too short.")
new_samples = size2
unwrapped = UnwrappedHistory()
for field in HISTORY_FIELDS:
setattr(unwrapped, field, [])
if hasattr(history1, "unwrapped"):
# Make a copy so the input object is not modified.
for field in HISTORY_FIELDS:
getattr(unwrapped, field).extend(getattr(history1, field))
else:
sample_range, ignore1, ignore2 = _compute_sample_range( # pylint: disable=unused-variable
history1, len(history1.pop_ping_drop_rate))
for i in sample_range:
for field in HISTORY_FIELDS:
getattr(unwrapped, field).append(getattr(history1, field)[i])
unwrapped.unwrapped = True
sample_range, ignore1, ignore2 = _compute_sample_range(history2, new_samples) # pylint: disable=unused-variable
for i in sample_range:
for field in HISTORY_FIELDS:
getattr(unwrapped, field).append(getattr(history2, field)[i])
unwrapped.current = history2.current
return unwrapped
def history_bulk_data(parse_samples, start=None, verbose=False, context=None, history=None): def history_bulk_data(parse_samples, start=None, verbose=False, context=None, history=None):
"""Fetch history data for a range of samples. """Fetch history data for a range of samples.