Resume from counter for history stats CSV output

Add an option to output to a specified file instead of standard output (which is still the default), and if set, attempt to read prior end counter for use in resuming history stats computation at that point. This behavior can be disabled using the --skip-query (-k) option.

Resuming will only work for CSV files that start with a header line that matches the last line in the file, and is currently only enabled for history stats, not bulk history, because the file read operation is not at all optimized for large files. (And because I don't think anyone is really using CSV for recording bulk history data, I only implemented that because it was easy to do so and helps with testing.)

While testing this, I realized that the implementation of the --poll-loops option has an awkward interaction with resuming from prior counter value, but that impacts all the scripts that support resuming from counter, so I will address that in a subsequent change.
This commit is contained in:
sparky8512 2022-02-19 15:51:46 -08:00
parent 0225a9783c
commit 32567059b8

View file

@ -2,8 +2,8 @@
"""Output Starlink user terminal data info in text format. """Output Starlink user terminal data info in text format.
This script pulls the current status info and/or metrics computed from the This script pulls the current status info and/or metrics computed from the
history data and prints them to stdout either once or in a periodic loop. history data and prints them to a file or stdout either once or in a periodic
By default, it will print the results in CSV format. loop. By default, it will print the results in CSV format.
Note that using this script to record the alert_detail group mode as CSV Note that using this script to record the alert_detail group mode as CSV
data is not recommended, because the number of alerts and their relative data is not recommended, because the number of alerts and their relative
@ -13,6 +13,7 @@ the alert_detail mode, you can use the alerts bitmask in the status group.
from datetime import datetime from datetime import datetime
import logging import logging
import os
import signal import signal
import sys import sys
import time import time
@ -20,6 +21,7 @@ import time
import dish_common import dish_common
import starlink_grpc import starlink_grpc
COUNTER_FIELD = "end_counter"
VERBOSE_FIELD_MAP = { VERBOSE_FIELD_MAP = {
# status fields (the remainder are either self-explanatory or I don't # status fields (the remainder are either self-explanatory or I don't
# know with confidence what they mean) # know with confidence what they mean)
@ -69,24 +71,43 @@ def handle_sigterm(signum, frame):
def parse_args(): def parse_args():
parser = dish_common.create_arg_parser( parser = dish_common.create_arg_parser(
output_description= output_description="print it in text format; by default, will print in CSV format")
"print it to standard output in text format; by default, will print in CSV format")
group = parser.add_argument_group(title="CSV output options") group = parser.add_argument_group(title="CSV output options")
group.add_argument("-H", group.add_argument("-H",
"--print-header", "--print-header",
action="store_true", action="store_true",
help="Print CSV header instead of parsing data") help="Print CSV header instead of parsing data")
group.add_argument("-O",
"--out-file",
default="-",
help="Output file path; if set, can also be used to resume from prior "
"history sample counter, default: write to standard output")
group.add_argument("-k",
"--skip-query",
action="store_true",
help="Skip querying for prior sample write point in history modes")
opts = dish_common.run_arg_parser(parser, no_stdout_errors=True) opts = dish_common.run_arg_parser(parser)
if (opts.history_stats_mode or opts.satus_mode) and opts.bulk_mode: if (opts.history_stats_mode or opts.satus_mode) and opts.bulk_mode and not opts.verbose:
parser.error("bulk_history cannot be combined with other modes for CSV output") parser.error("bulk_history cannot be combined with other modes for CSV output")
opts.skip_query |= opts.no_counter | opts.verbose
if opts.out_file == "-":
opts.no_stdout_errors = True
return opts return opts
def print_header(opts): def open_out_file(opts, mode):
if opts.out_file == "-":
# open new file, so it can be closed later without affecting sys.stdout
return os.fdopen(sys.stdout.fileno(), "w", closefd=False)
return open(opts.out_file, mode, buffering=1)
def print_header(opts, print_file):
header = ["datetimestamp_utc"] header = ["datetimestamp_utc"]
def header_add(names): def header_add(names):
@ -115,7 +136,6 @@ def print_header(opts):
if opts.bulk_mode: if opts.bulk_mode:
general, bulk = starlink_grpc.history_bulk_field_names() general, bulk = starlink_grpc.history_bulk_field_names()
header_add(general)
header_add(bulk) header_add(bulk)
if opts.history_stats_mode: if opts.history_stats_mode:
@ -133,11 +153,32 @@ def print_header(opts):
if "usage" in opts.mode: if "usage" in opts.mode:
header_add(usage) header_add(usage)
print(",".join(header)) print(",".join(header), file=print_file)
return 0 return 0
def loop_body(opts, gstate, shutdown=False): def get_prior_counter(opts, gstate):
# This implementation is terrible in that it makes a bunch of assumptions.
# Those assumptions should be true for files generated by this script, but
# it would be better not to make them. However, it also only works if the
# CSV file has a header that correctly matches the last line of the file,
# and there's really no way to verify that, so it's garbage in, garbage
# out, anyway. It also reads the entire file line-by-line, which is not
# great.
try:
with open_out_file(opts, "r") as csv_file:
header = csv_file.readline().split(",")
column = header.index(COUNTER_FIELD)
last_line = None
for last_line in csv_file:
pass
if last_line is not None:
gstate.counter_stats = int(last_line.split(",")[column])
except (IndexError, OSError, ValueError):
pass
def loop_body(opts, gstate, print_file, shutdown=False):
if opts.verbose: if opts.verbose:
csv_data = [] csv_data = []
else: else:
@ -169,17 +210,19 @@ def loop_body(opts, gstate, shutdown=False):
if opts.verbose: if opts.verbose:
print("Time range (UTC): {0} -> {1}".format( print("Time range (UTC): {0} -> {1}".format(
datetime.utcfromtimestamp(timestamp).isoformat(), datetime.utcfromtimestamp(timestamp).isoformat(),
datetime.utcfromtimestamp(timestamp + count).isoformat())) datetime.utcfromtimestamp(timestamp + count).isoformat()),
file=print_file)
for key, val in bulk.items(): for key, val in bulk.items():
print("{0:22} {1}".format(key + ":", ", ".join(xform(subval) for subval in val))) print("{0:22} {1}".format(key + ":", ", ".join(xform(subval) for subval in val)),
file=print_file)
if opts.loop_interval > 0.0: if opts.loop_interval > 0.0:
print() print(file=print_file)
else: else:
for i in range(count): for i in range(count):
timestamp += 1 timestamp += 1
fields = [datetime.utcfromtimestamp(timestamp).isoformat()] fields = [datetime.utcfromtimestamp(timestamp).isoformat()]
fields.extend([xform(val[i]) for val in bulk.values()]) fields.extend([xform(val[i]) for val in bulk.values()])
print(",".join(fields)) print(",".join(fields), file=print_file)
rc = dish_common.get_data(opts, rc = dish_common.get_data(opts,
gstate, gstate,
@ -190,13 +233,13 @@ def loop_body(opts, gstate, shutdown=False):
if opts.verbose: if opts.verbose:
if csv_data: if csv_data:
print("\n".join(csv_data)) print("\n".join(csv_data), file=print_file)
if opts.loop_interval > 0.0: if opts.loop_interval > 0.0:
print() print(file=print_file)
else: else:
# skip if only timestamp # skip if only timestamp
if len(csv_data) > 1: if len(csv_data) > 1:
print(",".join(csv_data)) print(",".join(csv_data), file=print_file)
return rc return rc
@ -207,16 +250,29 @@ def main():
logging.basicConfig(format="%(levelname)s: %(message)s") logging.basicConfig(format="%(levelname)s: %(message)s")
if opts.print_header: if opts.print_header:
rc = print_header(opts) try:
with open_out_file(opts, "a") as print_file:
rc = print_header(opts, print_file)
except OSError as e:
logging.error("Failed opening output file: %s", str(e))
rc = 1
sys.exit(rc) sys.exit(rc)
gstate = dish_common.GlobalState(target=opts.target) gstate = dish_common.GlobalState(target=opts.target)
if opts.out_file != "-" and not opts.skip_query and opts.history_stats_mode:
get_prior_counter(opts, gstate)
try:
print_file = open_out_file(opts, "a")
except OSError as e:
logging.error("Failed opening output file: %s", str(e))
sys.exit(1)
signal.signal(signal.SIGTERM, handle_sigterm) signal.signal(signal.SIGTERM, handle_sigterm)
try: try:
next_loop = time.monotonic() next_loop = time.monotonic()
while True: while True:
rc = loop_body(opts, gstate) rc = loop_body(opts, gstate, print_file)
if opts.loop_interval > 0.0: if opts.loop_interval > 0.0:
now = time.monotonic() now = time.monotonic()
next_loop = max(next_loop + opts.loop_interval, now) next_loop = max(next_loop + opts.loop_interval, now)
@ -226,7 +282,8 @@ def main():
except Terminated: except Terminated:
pass pass
finally: finally:
loop_body(opts, gstate, shutdown=True) loop_body(opts, gstate, print_file, shutdown=True)
print_file.close()
gstate.shutdown() gstate.shutdown()
sys.exit(rc) sys.exit(rc)