Tweaks/fixes mostly related to --poll-loops

Adjust how the --poll-loops option handles the first set of polled loops for history stats, to make that set of data run against a number of samples (and therefore total time interval) that more closely matches subsequent sets of polled loops. This especially applies to the case where the stats are resuming from a prior counter, where without this logic, the first set had an unintuitively large number of samples.

Fix timestamp reporting for history stats to better reflect the actual accumulated data, for cases where later polls of the history data had failed. This introduced the potential for the status data and the history stats data to have inconsistent timestamps, and required the timestamp collection to be moved from the individual output scripts into dish_common. Rather than deal with the complication this would create for CSV output, where there is only 1 timestamp field, just disallow the combination of options that could result in different timestamps (For CSV output only).

Fix an error case where poll loop counting was not being reset correctly.

Fix explicit --samples option to be honored when resuming from counter for history stats.
This commit is contained in:
sparky8512 2022-02-20 13:39:07 -08:00
parent c0f7dd5096
commit 09b8717131
6 changed files with 99 additions and 57 deletions

View file

@ -132,6 +132,9 @@ def run_arg_parser(parser, need_id=False, no_stdout_errors=False):
opts.samples = int(opts.loop_interval) if opts.loop_interval >= 1.0 else -1 opts.samples = int(opts.loop_interval) if opts.loop_interval >= 1.0 else -1
opts.bulk_samples = -1 opts.bulk_samples = -1
else: else:
# for scripts that query starting history counter, skip it if samples
# was explicitly set
opts.skip_query = True
opts.bulk_samples = opts.samples opts.bulk_samples = opts.samples
opts.no_stdout_errors = no_stdout_errors opts.no_stdout_errors = no_stdout_errors
@ -153,15 +156,17 @@ def conn_error(opts, msg, *args):
class GlobalState: class GlobalState:
"""A class for keeping state across loop iterations.""" """A class for keeping state across loop iterations."""
def __init__(self, target=None): def __init__(self, target=None):
# counter for bulk_history: # counter, timestamp for bulk_history:
self.counter = None self.counter = None
# counter for history stats:
self.counter_stats = None
self.timestamp = None self.timestamp = None
# counter, timestamp for history stats:
self.counter_stats = None
self.timestamp_stats = None
self.dish_id = None self.dish_id = None
self.context = starlink_grpc.ChannelContext(target=target) self.context = starlink_grpc.ChannelContext(target=target)
self.poll_count = 0 self.poll_count = 0
self.accum_history = None self.accum_history = None
self.first_poll = True
def shutdown(self): def shutdown(self):
self.context.close() self.context.close()
@ -195,23 +200,32 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None, flush_history=
would otherwise be lost on script restart. would otherwise be lost on script restart.
Returns: Returns:
1 if there were any failures getting data from the dish, otherwise 0. Tuple with 3 values. The first value is 1 if there were any failures
getting data from the dish, otherwise 0. The second value is an int
timestamp for status data (data with category "status"), or None if
no status data was reported. The third value is an int timestamp for
history stats data (non-bulk data with category other than "status"),
or None if no history stats data was reported.
""" """
if flush_history and opts.poll_loops < 2: if flush_history and opts.poll_loops < 2:
return 0 return 0, None, None
rc = 0 rc = 0
status_ts = None
hist_ts = None
if not flush_history: if not flush_history:
rc = get_status_data(opts, gstate, add_item, add_sequence) rc, status_ts = get_status_data(opts, gstate, add_item, add_sequence)
if opts.history_stats_mode and not rc: if opts.history_stats_mode and (not rc or opts.poll_loops > 1):
rc = get_history_stats(opts, gstate, add_item, add_sequence, flush_history) hist_rc, hist_ts = get_history_stats(opts, gstate, add_item, add_sequence, flush_history)
if not rc:
rc = hist_rc
if not flush_history and opts.bulk_mode and add_bulk and not rc: if not flush_history and opts.bulk_mode and add_bulk and not rc:
rc = get_bulk_data(opts, gstate, add_bulk) rc = get_bulk_data(opts, gstate, add_bulk)
return rc return rc, status_ts, hist_ts
def add_data_normal(data, category, add_item, add_sequence): def add_data_normal(data, category, add_item, add_sequence):
@ -237,6 +251,7 @@ def add_data_numeric(data, category, add_item, add_sequence):
def get_status_data(opts, gstate, add_item, add_sequence): def get_status_data(opts, gstate, add_item, add_sequence):
if opts.satus_mode: if opts.satus_mode:
timestamp = int(time.time())
try: try:
groups = starlink_grpc.status_data(context=gstate.context) groups = starlink_grpc.status_data(context=gstate.context)
status_data, obstruct_detail, alert_detail = groups[0:3] status_data, obstruct_detail, alert_detail = groups[0:3]
@ -244,13 +259,13 @@ def get_status_data(opts, gstate, add_item, add_sequence):
if "status" in opts.mode: if "status" in opts.mode:
if opts.need_id and gstate.dish_id is None: if opts.need_id and gstate.dish_id is None:
conn_error(opts, "Dish unreachable and ID unknown, so not recording state") conn_error(opts, "Dish unreachable and ID unknown, so not recording state")
return 1 return 1, None
if opts.verbose: if opts.verbose:
print("Dish unreachable") print("Dish unreachable")
add_item("state", "DISH_UNREACHABLE", "status") add_item("state", "DISH_UNREACHABLE", "status")
return 0 return 0, timestamp
conn_error(opts, "Failure getting status: %s", str(e)) conn_error(opts, "Failure getting status: %s", str(e))
return 1 return 1, None
if opts.need_id: if opts.need_id:
gstate.dish_id = status_data["id"] gstate.dish_id = status_data["id"]
del status_data["id"] del status_data["id"]
@ -261,25 +276,28 @@ def get_status_data(opts, gstate, add_item, add_sequence):
add_data(obstruct_detail, "status", add_item, add_sequence) add_data(obstruct_detail, "status", add_item, add_sequence)
if "alert_detail" in opts.mode: if "alert_detail" in opts.mode:
add_data(alert_detail, "status", add_item, add_sequence) add_data(alert_detail, "status", add_item, add_sequence)
return 0, timestamp
elif opts.need_id and gstate.dish_id is None: elif opts.need_id and gstate.dish_id is None:
try: try:
gstate.dish_id = starlink_grpc.get_id(context=gstate.context) gstate.dish_id = starlink_grpc.get_id(context=gstate.context)
except starlink_grpc.GrpcError as e: except starlink_grpc.GrpcError as e:
conn_error(opts, "Failure getting dish ID: %s", str(e)) conn_error(opts, "Failure getting dish ID: %s", str(e))
return 1 return 1, None
if opts.verbose: if opts.verbose:
print("Using dish ID: " + gstate.dish_id) print("Using dish ID: " + gstate.dish_id)
return 0 return 0, None
def get_history_stats(opts, gstate, add_item, add_sequence, flush_history): def get_history_stats(opts, gstate, add_item, add_sequence, flush_history):
"""Fetch history stats. See `get_data` for details.""" """Fetch history stats. See `get_data` for details."""
if flush_history: if flush_history or (opts.need_id and gstate.dish_id is None):
history = None history = None
else: else:
try: try:
timestamp = int(time.time())
history = starlink_grpc.get_history(context=gstate.context) history = starlink_grpc.get_history(context=gstate.context)
gstate.timestamp_stats = timestamp
except grpc.RpcError as e: except grpc.RpcError as e:
conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e))) conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e)))
history = None history = None
@ -306,17 +324,28 @@ def get_history_stats(opts, gstate, add_item, add_sequence, flush_history):
else: else:
gstate.accum_history = history gstate.accum_history = history
# When resuming from prior count with --poll-loops set, advance the loop
# count by however many loops worth of data was caught up on. This helps
# avoid abnormally large sample counts in the first set of output data.
if gstate.first_poll and gstate.accum_history:
if opts.poll_loops > 1 and gstate.counter_stats:
new_samples = gstate.accum_history.current - gstate.counter_stats
if new_samples < 0:
new_samples = gstate.accum_history.current
if new_samples > len(gstate.accum_history.pop_ping_drop_rate):
new_samples = len(gstate.accum_history.pop_ping_drop_rate)
gstate.poll_count = max(gstate.poll_count, int((new_samples-1) / opts.loop_interval))
gstate.first_poll = False
if gstate.poll_count < opts.poll_loops - 1 and not flush_history: if gstate.poll_count < opts.poll_loops - 1 and not flush_history:
gstate.poll_count += 1 gstate.poll_count += 1
return 0 return 0, None
# This can happen if all polling attempts failed. Verbose output has
# already happened, so just return.
if gstate.accum_history is None:
return 0 if flush_history else 1
gstate.poll_count = 0 gstate.poll_count = 0
if gstate.accum_history is None:
return (0, None) if flush_history else (1, None)
groups = starlink_grpc.history_stats(parse_samples, groups = starlink_grpc.history_stats(parse_samples,
start=start, start=start,
verbose=opts.verbose, verbose=opts.verbose,
@ -337,9 +366,11 @@ def get_history_stats(opts, gstate, add_item, add_sequence, flush_history):
if not opts.no_counter: if not opts.no_counter:
gstate.counter_stats = general["end_counter"] gstate.counter_stats = general["end_counter"]
timestamp = gstate.timestamp_stats
gstate.timestamp_stats = None
gstate.accum_history = None gstate.accum_history = None
return 0 return 0, timestamp
def get_bulk_data(opts, gstate, add_bulk): def get_bulk_data(opts, gstate, add_bulk):

View file

@ -51,7 +51,8 @@ def handle_sigterm(signum, frame):
def parse_args(): def parse_args():
parser = dish_common.create_arg_parser(output_description="write it to an InfluxDB 1.x database") parser = dish_common.create_arg_parser(
output_description="write it to an InfluxDB 1.x database")
group = parser.add_argument_group(title="InfluxDB 1.x database options") group = parser.add_argument_group(title="InfluxDB 1.x database options")
group.add_argument("-n", group.add_argument("-n",
@ -250,24 +251,24 @@ def loop_body(opts, gstate, shutdown=False):
# save off counter value for script restart # save off counter value for script restart
points[-1]["fields"]["counter"] = counter + count points[-1]["fields"]["counter"] = counter + count
now = time.time() rc, status_ts, hist_ts = dish_common.get_data(opts,
rc = dish_common.get_data(opts, gstate,
gstate, cb_add_item,
cb_add_item, cb_add_sequence,
cb_add_sequence, add_bulk=cb_add_bulk,
add_bulk=cb_add_bulk, flush_history=shutdown)
flush_history=shutdown)
if rc: if rc:
return rc return rc
for category in fields: for category in fields:
if fields[category]: if fields[category]:
timestamp = status_ts if category == "status" else hist_ts
gstate.points.append({ gstate.points.append({
"measurement": "spacex.starlink.user_terminal." + category, "measurement": "spacex.starlink.user_terminal." + category,
"tags": { "tags": {
"id": gstate.dish_id "id": gstate.dish_id
}, },
"time": int(now), "time": timestamp,
"fields": fields[category], "fields": fields[category],
}) })

View file

@ -241,24 +241,24 @@ def loop_body(opts, gstate, shutdown=False):
# save off counter value for script restart # save off counter value for script restart
points[-1]["fields"]["counter"] = counter + count points[-1]["fields"]["counter"] = counter + count
now = time.time() rc, status_ts, hist_ts = dish_common.get_data(opts,
rc = dish_common.get_data(opts, gstate,
gstate, cb_add_item,
cb_add_item, cb_add_sequence,
cb_add_sequence, add_bulk=cb_add_bulk,
add_bulk=cb_add_bulk, flush_history=shutdown)
flush_history=shutdown)
if rc: if rc:
return rc return rc
for category in fields: for category in fields:
if fields[category]: if fields[category]:
timestamp = status_ts if category == "status" else hist_ts
gstate.points.append({ gstate.points.append({
"measurement": "spacex.starlink.user_terminal." + category, "measurement": "spacex.starlink.user_terminal." + category,
"tags": { "tags": {
"id": gstate.dish_id "id": gstate.dish_id
}, },
"time": int(now), "time": timestamp,
"fields": fields[category], "fields": fields[category],
}) })

View file

@ -100,7 +100,7 @@ def loop_body(opts, gstate):
("starlink/dish_{0}/{1}/{2}".format(category, gstate.dish_id, ("starlink/dish_{0}/{1}/{2}".format(category, gstate.dish_id,
key), ",".join(str(x) for x in val), 0, False)) key), ",".join(str(x) for x in val), 0, False))
rc = dish_common.get_data(opts, gstate, cb_add_item, cb_add_sequence) rc = dish_common.get_data(opts, gstate, cb_add_item, cb_add_sequence)[0]
if msgs: if msgs:
try: try:

View file

@ -122,16 +122,20 @@ def loop_body(opts, gstate, shutdown=False):
row.append(counter) row.append(counter)
hist_rows.append(row) hist_rows.append(row)
now = int(time.time())
rc = 0 rc = 0
status_ts = None
hist_ts = None
if not shutdown: if not shutdown:
rc = dish_common.get_status_data(opts, gstate, cb_add_item, cb_add_sequence) rc, status_ts = dish_common.get_status_data(opts, gstate, cb_add_item, cb_add_sequence)
if opts.history_stats_mode and not rc: if opts.history_stats_mode and (not rc or opts.poll_loops > 1):
if gstate.counter_stats is None and not opts.skip_query and opts.samples < 0: if gstate.counter_stats is None and not opts.skip_query and opts.samples < 0:
_, gstate.counter_stats = query_counter(opts, gstate, "end_counter", "ping_stats") _, gstate.counter_stats = query_counter(opts, gstate, "end_counter", "ping_stats")
rc = dish_common.get_history_stats(opts, gstate, cb_add_item, cb_add_sequence, shutdown) hist_rc, hist_ts = dish_common.get_history_stats(opts, gstate, cb_add_item, cb_add_sequence,
shutdown)
if not rc:
rc = hist_rc
if not shutdown and opts.bulk_mode and not rc: if not shutdown and opts.bulk_mode and not rc:
if gstate.counter is None and not opts.skip_query and opts.bulk_samples < 0: if gstate.counter is None and not opts.skip_query and opts.bulk_samples < 0:
@ -144,11 +148,12 @@ def loop_body(opts, gstate, shutdown=False):
cur = gstate.sql_conn.cursor() cur = gstate.sql_conn.cursor()
for category, fields in tables.items(): for category, fields in tables.items():
if fields: if fields:
timestamp = status_ts if category == "status" else hist_ts
sql = 'INSERT OR REPLACE INTO "{0}" ("time","id",{1}) VALUES ({2})'.format( sql = 'INSERT OR REPLACE INTO "{0}" ("time","id",{1}) VALUES ({2})'.format(
category, ",".join('"' + x + '"' for x in fields), category, ",".join('"' + x + '"' for x in fields),
",".join(repeat("?", ",".join(repeat("?",
len(fields) + 2))) len(fields) + 2)))
values = [now, gstate.dish_id] values = [timestamp, gstate.dish_id]
values.extend(fields.values()) values.extend(fields.values())
cur.execute(sql, values) cur.execute(sql, values)
rows_written += 1 rows_written += 1

View file

@ -93,6 +93,13 @@ def parse_args():
if (opts.history_stats_mode or opts.satus_mode) and opts.bulk_mode and not opts.verbose: if (opts.history_stats_mode or opts.satus_mode) and opts.bulk_mode and not opts.verbose:
parser.error("bulk_history cannot be combined with other modes for CSV output") parser.error("bulk_history cannot be combined with other modes for CSV output")
# Technically possible, but a pain to implement, so just disallow it. User
# probably doesn't realize how weird it would be, anyway, given that stats
# data reports at a different rate from status data in this case.
if opts.history_stats_mode and opts.satus_mode and not opts.verbose and opts.poll_loops > 1:
parser.error("usage of --poll-loops with history stats modes cannot be mixed with status "
"modes for CSV output")
opts.skip_query |= opts.no_counter | opts.verbose opts.skip_query |= opts.no_counter | opts.verbose
if opts.out_file == "-": if opts.out_file == "-":
opts.no_stdout_errors = True opts.no_stdout_errors = True
@ -179,10 +186,7 @@ def get_prior_counter(opts, gstate):
def loop_body(opts, gstate, print_file, shutdown=False): def loop_body(opts, gstate, print_file, shutdown=False):
if opts.verbose: csv_data = []
csv_data = []
else:
csv_data = [datetime.utcnow().replace(microsecond=0).isoformat()]
def xform(val): def xform(val):
return "" if val is None else str(val) return "" if val is None else str(val)
@ -224,12 +228,12 @@ def loop_body(opts, gstate, print_file, shutdown=False):
fields.extend([xform(val[i]) for val in bulk.values()]) fields.extend([xform(val[i]) for val in bulk.values()])
print(",".join(fields), file=print_file) print(",".join(fields), file=print_file)
rc = dish_common.get_data(opts, rc, status_ts, hist_ts = dish_common.get_data(opts,
gstate, gstate,
cb_data_add_item, cb_data_add_item,
cb_data_add_sequence, cb_data_add_sequence,
add_bulk=cb_add_bulk, add_bulk=cb_add_bulk,
flush_history=shutdown) flush_history=shutdown)
if opts.verbose: if opts.verbose:
if csv_data: if csv_data:
@ -237,8 +241,9 @@ def loop_body(opts, gstate, print_file, shutdown=False):
if opts.loop_interval > 0.0: if opts.loop_interval > 0.0:
print(file=print_file) print(file=print_file)
else: else:
# skip if only timestamp if csv_data:
if len(csv_data) > 1: timestamp = status_ts if status_ts is not None else hist_ts
csv_data.insert(0, datetime.utcfromtimestamp(timestamp).isoformat())
print(",".join(csv_data), file=print_file) print(",".join(csv_data), file=print_file)
return rc return rc