sparky8512 d603272d90 Add option to emit booleans as numeric values
New command line option, -N or --numeric, that will cause all boolean values, including those in sequences/arrays, to be written as 1 or 0 instead of True or False.

Per request in issue 26.

WARNING: Use or non-use of this option with the database output scripts will change the schema of the data. sqlite doesn't care about that, because it stores booleans as integers, anyway, but InfluxDB will trip an error if you try to record data points with this option to a database that has data point recorded without it, or vice versa.
2021-05-23 18:45:18 -07:00

366 lines
15 KiB

"""Shared code among the dish_grpc_* commands
This module is not intended to be generically useful or to export a stable
interface. Rather, it should be considered an implementation detail of the
other scripts, and will change as needed.
For a module that exports an interface intended for general use, see
import argparse
from datetime import datetime
from datetime import timezone
import logging
import re
import time
import grpc
import starlink_grpc
BRACKETS_RE = re.compile(r"([^[]*)(\[((\d+),|)(\d*)\]|)$")
STATUS_MODES = ["status", "obstruction_detail", "alert_detail"]
"ping_drop", "ping_run_length", "ping_latency", "ping_loaded_latency", "usage"
def create_arg_parser(output_description, bulk_history=True):
"""Create an argparse parser and add the common command line options."""
parser = argparse.ArgumentParser(
description="Collect status and/or history data from a Starlink user terminal and " +
epilog="Additional arguments can be read from a file by including @FILENAME as an "
"option, where FILENAME is a path to a file that contains arguments, one per line.",
# need to remember this for later
parser.bulk_history = bulk_history
group = parser.add_argument_group(title="General options")
help="host:port of dish to query, default is the standard IP address "
"and port (")
group.add_argument("-h", "--help", action="help", help="Be helpful")
help="Record boolean values as 1 and 0 instead of True and False")
help="Loop interval in seconds or 0 for no loop, default: " +
group.add_argument("-v", "--verbose", action="store_true", help="Be verbose")
group = parser.add_argument_group(title="History mode options")
help="Parse all valid samples")
help="Poll history for N loops or until reboot detected, before computing "
"history stats; this allows for a smaller loop interval with less loss of "
"data when the dish reboots",
if bulk_history:
sample_help = ("Number of data samples to parse; normally applies to first loop "
"iteration only, default: all in bulk mode, loop interval if loop "
"interval set, else " + str(SAMPLES_DEFAULT))
no_counter_help = ("Don't track sample counter across loop iterations in non-bulk "
"modes; keep using samples option value instead")
sample_help = ("Number of data samples to parse; normally applies to first loop "
"iteration only, default: loop interval, if set, else " +
no_counter_help = ("Don't track sample counter across loop iterations; keep using "
"samples option value instead")
group.add_argument("-s", "--samples", type=int, help=sample_help)
group.add_argument("-j", "--no-counter", action="store_true", help=no_counter_help)
return parser
def run_arg_parser(parser, need_id=False, no_stdout_errors=False):
"""Run parse_args on a parser previously created with create_arg_parser
need_id (bool): A flag to set in options to indicate whether or not to
set dish_id on the global state object; see get_data for more
no_stdout_errors (bool): A flag set in options to protect stdout from
error messages, in case that's where the data output is going, so
may be being redirected to a file.
An argparse Namespace object with the parsed options set as attributes.
if parser.bulk_history:
help="The data group to record, one or more of: " + ", ".join(all_modes),
opts = parser.parse_args()
if opts.loop_interval <= 0.0 or opts.poll_loops is None:
opts.poll_loops = 1
elif opts.poll_loops < 2:
parser.error("Poll loops arg must be 2 or greater to be meaningful")
# for convenience, set flags for whether any mode in a group is selected
opts.satus_mode = bool(set(STATUS_MODES).intersection(opts.mode))
opts.history_stats_mode = bool(set(HISTORY_STATS_MODES).intersection(opts.mode))
opts.bulk_mode = "bulk_history" in opts.mode
if opts.samples is None:
opts.samples = int(opts.loop_interval *
opts.poll_loops) if opts.loop_interval >= 1.0 else SAMPLES_DEFAULT
opts.bulk_samples = -1
opts.bulk_samples = opts.samples
opts.no_stdout_errors = no_stdout_errors
opts.need_id = need_id
return opts
def conn_error(opts, msg, *args):
"""Indicate an error in an appropriate way."""
# Connection errors that happen in an interval loop are not critical
# failures, but are interesting enough to print in non-verbose mode.
if opts.loop_interval > 0.0 and not opts.no_stdout_errors:
print(msg % args)
logging.error(msg, *args)
class GlobalState:
"""A class for keeping state across loop iterations."""
def __init__(self, target=None):
# counter for bulk_history:
self.counter = None
# counter for history stats:
self.counter_stats = None
self.timestamp = None
self.dish_id = None
self.context = starlink_grpc.ChannelContext(target=target)
self.poll_count = 0
self.prev_history = None
def shutdown(self):
def get_data(opts, gstate, add_item, add_sequence, add_bulk=None):
"""Fetch data from the dish, pull it apart and call back with the pieces.
This function uses call backs to return the useful data. If need_id is set
in opts, then it is guaranteed that dish_id will have been set in gstate
prior to any of the call backs being invoked.
opts (object): The options object returned from run_arg_parser.
gstate (GlobalState): An object for keeping track of state across
multiple calls.
add_item (function): Call back for non-sequence data, with prototype:
add_item(name, value, category)
add_sequence (function): Call back for sequence data, with prototype:
add_sequence(name, value, category, start_index_label)
add_bulk (function): Optional. Call back for bulk history data, with
add_bulk(bulk_data, count, start_timestamp, start_counter)
1 if there were any failures getting data from the dish, otherwise 0.
rc = get_status_data(opts, gstate, add_item, add_sequence)
if opts.history_stats_mode and not rc:
rc = get_history_stats(opts, gstate, add_item, add_sequence)
if opts.bulk_mode and add_bulk and not rc:
rc = get_bulk_data(opts, gstate, add_bulk)
return rc
def add_data_normal(data, category, add_item, add_sequence):
for key, val in data.items():
name, start, seq = BRACKETS_RE.match(key).group(1, 4, 5)
if seq is None:
add_item(name, val, category)
add_sequence(name, val, category, int(start) if start else 0)
def add_data_numeric(data, category, add_item, add_sequence):
for key, val in data.items():
name, start, seq = BRACKETS_RE.match(key).group(1, 4, 5)
if seq is None:
add_item(name, int(val) if isinstance(val, int) else val, category)
[int(subval) if isinstance(subval, int) else subval for subval in val],
int(start) if start else 0)
def get_status_data(opts, gstate, add_item, add_sequence):
if opts.satus_mode:
groups = starlink_grpc.status_data(context=gstate.context)
status_data, obstruct_detail, alert_detail = groups[0:3]
except starlink_grpc.GrpcError as e:
if "status" in opts.mode:
if opts.need_id and gstate.dish_id is None:
conn_error(opts, "Dish unreachable and ID unknown, so not recording state")
return 1
if opts.verbose:
print("Dish unreachable")
add_item("state", "DISH_UNREACHABLE", "status")
return 0
conn_error(opts, "Failure getting status: %s", str(e))
return 1
if opts.need_id:
gstate.dish_id = status_data["id"]
del status_data["id"]
add_data = add_data_numeric if opts.numeric else add_data_normal
if "status" in opts.mode:
add_data(status_data, "status", add_item, add_sequence)
if "obstruction_detail" in opts.mode:
add_data(obstruct_detail, "status", add_item, add_sequence)
if "alert_detail" in opts.mode:
add_data(alert_detail, "status", add_item, add_sequence)
elif opts.need_id and gstate.dish_id is None:
gstate.dish_id = starlink_grpc.get_id(context=gstate.context)
except starlink_grpc.GrpcError as e:
conn_error(opts, "Failure getting dish ID: %s", str(e))
return 1
if opts.verbose:
print("Using dish ID: " + gstate.dish_id)
return 0
def get_history_stats(opts, gstate, add_item, add_sequence):
"""Fetch history stats. See `get_data` for details."""
history = starlink_grpc.get_history(context=gstate.context)
except grpc.RpcError as e:
conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e)))
history = gstate.prev_history
if history is None:
return 1
if history and gstate.prev_history and history.current < gstate.prev_history.current:
if opts.verbose:
print("Dish reboot detected. Restarting loop polling count.")
# process saved history data and keep the new data for next time
history, gstate.prev_history = gstate.prev_history, history
# the newly saved data counts as a loop, so advance 1 past reset point
gstate.poll_count = opts.poll_loops - 2
elif gstate.poll_count > 0:
gstate.poll_count -= 1
gstate.prev_history = history
# if no --poll-loops option set, opts.poll_loops gets set to 1, so
# poll_count will always be 0 and prev_history will always be None
gstate.prev_history = None
gstate.poll_count = opts.poll_loops - 1
start = gstate.counter_stats
parse_samples = opts.samples if start is None else -1
groups = starlink_grpc.history_stats(parse_samples,
general, ping, runlen, latency, loaded, usage = groups[0:6]
add_data = add_data_numeric if opts.numeric else add_data_normal
add_data(general, "ping_stats", add_item, add_sequence)
if "ping_drop" in opts.mode:
add_data(ping, "ping_stats", add_item, add_sequence)
if "ping_run_length" in opts.mode:
add_data(runlen, "ping_stats", add_item, add_sequence)
if "ping_latency" in opts.mode:
add_data(latency, "ping_stats", add_item, add_sequence)
if "ping_loaded_latency" in opts.mode:
add_data(loaded, "ping_stats", add_item, add_sequence)
if "usage" in opts.mode:
add_data(usage, "usage", add_item, add_sequence)
if not opts.no_counter:
gstate.counter_stats = general["end_counter"]
return 0
def get_bulk_data(opts, gstate, add_bulk):
"""Fetch bulk data. See `get_data` for details."""
before = time.time()
start = gstate.counter
parse_samples = opts.bulk_samples if start is None else -1
general, bulk = starlink_grpc.history_bulk_data(parse_samples,
except starlink_grpc.GrpcError as e:
conn_error(opts, "Failure getting history: %s", str(e))
return 1
after = time.time()
parsed_samples = general["samples"]
new_counter = general["end_counter"]
timestamp = gstate.timestamp
# check this first, so it doesn't report as lost time sync
if gstate.counter is not None and new_counter != gstate.counter + parsed_samples:
timestamp = None
# Allow up to 2 seconds of time drift before forcibly re-syncing, since
# +/- 1 second can happen just due to scheduler timing.
if timestamp is not None and not before - 2.0 <= timestamp + parsed_samples <= after + 2.0:
if opts.verbose:
print("Lost sample time sync at: " +
str(datetime.fromtimestamp(timestamp + parsed_samples, tz=timezone.utc)))
timestamp = None
if timestamp is None:
timestamp = int(before)
if opts.verbose:
print("Establishing new time base: {0} -> {1}".format(
new_counter, datetime.fromtimestamp(timestamp, tz=timezone.utc)))
timestamp -= parsed_samples
if opts.numeric:
k: [int(subv) if isinstance(subv, int) else subv for subv in v]
for k, v in bulk.items()
}, parsed_samples, timestamp, new_counter - parsed_samples)
add_bulk(bulk, parsed_samples, timestamp, new_counter - parsed_samples)
gstate.counter = new_counter
gstate.timestamp = timestamp + parsed_samples
return 0