Add option to poll history more frequently

This further complicates the code, for functionality that probably only I care about, but when computing stats for relatively long time intervals, it really hurts when the dish reboots and up to an entire time period's worth of data is lost at exactly the point where it may have been having interesting behavior.
This commit is contained in:
sparky8512 2021-02-19 10:56:20 -08:00
parent 18829bd5cb
commit 38987054b9
2 changed files with 93 additions and 42 deletions

View file

@ -17,6 +17,8 @@ import logging
import re import re
import time import time
import grpc
import starlink_grpc import starlink_grpc
BRACKETS_RE = re.compile(r"([^[]*)(\[((\d+),|)(\d*)\]|)$") BRACKETS_RE = re.compile(r"([^[]*)(\[((\d+),|)(\d*)\]|)$")
@ -63,6 +65,13 @@ def create_arg_parser(output_description, bulk_history=True):
const=-1, const=-1,
dest="samples", dest="samples",
help="Parse all valid samples") help="Parse all valid samples")
group.add_argument("-o",
"--poll-loops",
type=int,
help="Poll history for N loops or until reboot detected, before computing "
"history stats; this allows for a smaller loop interval with less loss of "
"data when the dish reboots",
metavar="N")
if bulk_history: if bulk_history:
sample_help = ("Number of data samples to parse; normally applies to first loop " sample_help = ("Number of data samples to parse; normally applies to first loop "
"iteration only, default: -1 in bulk mode, loop interval if loop interval " "iteration only, default: -1 in bulk mode, loop interval if loop interval "
@ -106,14 +115,22 @@ def run_arg_parser(parser, need_id=False, no_stdout_errors=False):
opts = parser.parse_args() opts = parser.parse_args()
if opts.loop_interval <= 0.0 or opts.poll_loops is None:
opts.poll_loops = 1
elif opts.poll_loops < 2:
parser.error("Poll loops arg must be 2 or greater to be meaningful")
# for convenience, set flags for whether any mode in a group is selected # for convenience, set flags for whether any mode in a group is selected
opts.satus_mode = bool(set(STATUS_MODES).intersection(opts.mode)) opts.satus_mode = bool(set(STATUS_MODES).intersection(opts.mode))
opts.history_stats_mode = bool(set(HISTORY_STATS_MODES).intersection(opts.mode)) opts.history_stats_mode = bool(set(HISTORY_STATS_MODES).intersection(opts.mode))
opts.bulk_mode = "bulk_history" in opts.mode opts.bulk_mode = "bulk_history" in opts.mode
if opts.samples is None: if opts.samples is None:
opts.samples = -1 if opts.bulk_mode else int( opts.samples = int(opts.loop_interval *
opts.loop_interval) if opts.loop_interval >= 1.0 else SAMPLES_DEFAULT opts.poll_loops) if opts.loop_interval >= 1.0 else SAMPLES_DEFAULT
opts.bulk_samples = -1
else:
opts.bulk_samples = opts.samples
opts.no_stdout_errors = no_stdout_errors opts.no_stdout_errors = no_stdout_errors
opts.need_id = need_id opts.need_id = need_id
@ -141,6 +158,8 @@ class GlobalState:
self.timestamp = None self.timestamp = None
self.dish_id = None self.dish_id = None
self.context = starlink_grpc.ChannelContext(target=target) self.context = starlink_grpc.ChannelContext(target=target)
self.poll_count = 0
self.prev_history = None
def shutdown(self): def shutdown(self):
self.context.close() self.context.close()
@ -212,18 +231,52 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None):
if opts.verbose: if opts.verbose:
print("Using dish ID: " + gstate.dish_id) print("Using dish ID: " + gstate.dish_id)
if not opts.history_stats_mode and not (opts.bulk_mode and add_bulk):
return 0
try:
history = starlink_grpc.get_history(context=gstate.context)
except grpc.RpcError as e:
conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e)))
history = gstate.prev_history
if history is None:
return 1
if opts.history_stats_mode: if opts.history_stats_mode:
get_history_stats(opts, gstate, add_data, history)
if opts.bulk_mode and add_bulk:
return get_bulk_data(opts, gstate, add_bulk)
return 0
def get_history_stats(opts, gstate, add_data, history):
"""Fetch history stats. See `get_data` for details."""
if history and gstate.prev_history and history.current < gstate.prev_history.current:
if opts.verbose:
print("Dish reboot detected. Restarting loop polling count.")
# process saved history data and keep the new data for next time
history, gstate.prev_history = gstate.prev_history, history
# the newly saved data counts as a loop, so advance 1 past reset point
gstate.poll_count = opts.poll_loops - 2
elif gstate.poll_count > 0:
gstate.poll_count -= 1
gstate.prev_history = history
return
else:
# if no --poll-loops option set, opts.poll_loops gets set to 1, so
# poll_count will always be 0 and prev_history will always be None
gstate.prev_history = None
gstate.poll_count = opts.poll_loops - 1
start = gstate.counter_stats start = gstate.counter_stats
parse_samples = opts.samples if start is None else -1 parse_samples = opts.samples if start is None else -1
try:
groups = starlink_grpc.history_stats(parse_samples, groups = starlink_grpc.history_stats(parse_samples,
start=start, start=start,
verbose=opts.verbose, verbose=opts.verbose,
context=gstate.context) history=history)
general, ping, runlen, latency, loaded, usage = groups[0:6] general, ping, runlen, latency, loaded, usage = groups[0:6]
except starlink_grpc.GrpcError as e:
conn_error(opts, "Failure getting ping stats: %s", str(e))
return 1
add_data(general, "ping_stats") add_data(general, "ping_stats")
if "ping_drop" in opts.mode: if "ping_drop" in opts.mode:
add_data(ping, "ping_stats") add_data(ping, "ping_stats")
@ -238,22 +291,13 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None):
if not opts.no_counter: if not opts.no_counter:
gstate.counter_stats = general["end_counter"] gstate.counter_stats = general["end_counter"]
if opts.bulk_mode and add_bulk:
return get_bulk_data(opts, gstate, add_bulk)
return 0
def get_bulk_data(opts, gstate, add_bulk): def get_bulk_data(opts, gstate, add_bulk):
"""Fetch bulk data. See `get_data` for details. """Fetch bulk data. See `get_data` for details."""
This was split out in case bulk data needs to be handled separately, for
example, if dish_id needs to be known before calling.
"""
before = time.time() before = time.time()
start = gstate.counter start = gstate.counter
parse_samples = opts.samples if start is None else -1 parse_samples = opts.bulk_samples if start is None else -1
try: try:
general, bulk = starlink_grpc.history_bulk_data(parse_samples, general, bulk = starlink_grpc.history_bulk_data(parse_samples,
start=start, start=start,
@ -288,3 +332,4 @@ def get_bulk_data(opts, gstate, add_bulk):
gstate.counter = new_counter gstate.counter = new_counter
gstate.timestamp = timestamp + parsed_samples gstate.timestamp = timestamp + parsed_samples
return 0

View file

@ -797,7 +797,7 @@ def _compute_sample_range(history, parse_samples, start=None, verbose=False):
return sample_range, current - start, current return sample_range, current - start, current
def history_bulk_data(parse_samples, start=None, verbose=False, context=None): def history_bulk_data(parse_samples, start=None, verbose=False, context=None, history=None):
"""Fetch history data for a range of samples. """Fetch history data for a range of samples.
Args: Args:
@ -818,6 +818,8 @@ def history_bulk_data(parse_samples, start=None, verbose=False, context=None):
verbose (bool): Optionally produce verbose output. verbose (bool): Optionally produce verbose output.
context (ChannelContext): Optionally provide a channel for reuse context (ChannelContext): Optionally provide a channel for reuse
across repeated calls. across repeated calls.
history: Optionally provide the history data to use instead of fetching
it, from a prior call to `get_history`.
Returns: Returns:
A tuple with 2 dicts, the first mapping general data names to their A tuple with 2 dicts, the first mapping general data names to their
@ -832,6 +834,7 @@ def history_bulk_data(parse_samples, start=None, verbose=False, context=None):
GrpcError: Failed getting history info from the Starlink user GrpcError: Failed getting history info from the Starlink user
terminal. terminal.
""" """
if history is None:
try: try:
history = get_history(context) history = get_history(context)
except grpc.RpcError as e: except grpc.RpcError as e:
@ -879,7 +882,7 @@ def history_ping_stats(parse_samples, verbose=False, context=None):
return history_stats(parse_samples, verbose=verbose, context=context)[0:3] return history_stats(parse_samples, verbose=verbose, context=context)[0:3]
def history_stats(parse_samples, start=None, verbose=False, context=None): def history_stats(parse_samples, start=None, verbose=False, context=None, history=None):
"""Fetch, parse, and compute the packet loss stats. """Fetch, parse, and compute the packet loss stats.
Note: Note:
@ -891,6 +894,8 @@ def history_stats(parse_samples, start=None, verbose=False, context=None):
verbose (bool): Optionally produce verbose output. verbose (bool): Optionally produce verbose output.
context (ChannelContext): Optionally provide a channel for reuse context (ChannelContext): Optionally provide a channel for reuse
across repeated calls. across repeated calls.
history: Optionally provide the history data to use instead of fetching
it, from a prior call to `get_history`.
Returns: Returns:
A tuple with 6 dicts, mapping general data names, ping drop stat A tuple with 6 dicts, mapping general data names, ping drop stat
@ -907,6 +912,7 @@ def history_stats(parse_samples, start=None, verbose=False, context=None):
GrpcError: Failed getting history info from the Starlink user GrpcError: Failed getting history info from the Starlink user
terminal. terminal.
""" """
if history is None:
try: try:
history = get_history(context) history = get_history(context)
except grpc.RpcError as e: except grpc.RpcError as e: