From 09b8717131798b7a761680a7b7701df3d95beb42 Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Sun, 20 Feb 2022 13:39:07 -0800 Subject: [PATCH] Tweaks/fixes mostly related to --poll-loops Adjust how the --poll-loops option handles the first set of polled loops for history stats, to make that set of data run against a number of samples (and therefore total time interval) that more closely matches subsequent sets of polled loops. This especially applies to the case where the stats are resuming from a prior counter, where without this logic, the first set had an unintuitively large number of samples. Fix timestamp reporting for history stats to better reflect the actual accumulated data, for cases where later polls of the history data had failed. This introduced the potential for the status data and the history stats data to have inconsistent timestamps, and required the timestamp collection to be moved from the individual output scripts into dish_common. Rather than deal with the complication this would create for CSV output, where there is only 1 timestamp field, just disallow the combination of options that could result in different timestamps (For CSV output only). Fix an error case where poll loop counting was not being reset correctly. Fix explicit --samples option to be honored when resuming from counter for history stats. --- dish_common.py | 75 +++++++++++++++++++++++++++++++------------- dish_grpc_influx.py | 19 +++++------ dish_grpc_influx2.py | 16 +++++----- dish_grpc_mqtt.py | 2 +- dish_grpc_sqlite.py | 15 ++++++--- dish_grpc_text.py | 29 ++++++++++------- 6 files changed, 99 insertions(+), 57 deletions(-) diff --git a/dish_common.py b/dish_common.py index 54b658b..ced9f4d 100644 --- a/dish_common.py +++ b/dish_common.py @@ -132,6 +132,9 @@ def run_arg_parser(parser, need_id=False, no_stdout_errors=False): opts.samples = int(opts.loop_interval) if opts.loop_interval >= 1.0 else -1 opts.bulk_samples = -1 else: + # for scripts that query starting history counter, skip it if samples + # was explicitly set + opts.skip_query = True opts.bulk_samples = opts.samples opts.no_stdout_errors = no_stdout_errors @@ -153,15 +156,17 @@ def conn_error(opts, msg, *args): class GlobalState: """A class for keeping state across loop iterations.""" def __init__(self, target=None): - # counter for bulk_history: + # counter, timestamp for bulk_history: self.counter = None - # counter for history stats: - self.counter_stats = None self.timestamp = None + # counter, timestamp for history stats: + self.counter_stats = None + self.timestamp_stats = None self.dish_id = None self.context = starlink_grpc.ChannelContext(target=target) self.poll_count = 0 self.accum_history = None + self.first_poll = True def shutdown(self): self.context.close() @@ -195,23 +200,32 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None, flush_history= would otherwise be lost on script restart. Returns: - 1 if there were any failures getting data from the dish, otherwise 0. + Tuple with 3 values. The first value is 1 if there were any failures + getting data from the dish, otherwise 0. The second value is an int + timestamp for status data (data with category "status"), or None if + no status data was reported. The third value is an int timestamp for + history stats data (non-bulk data with category other than "status"), + or None if no history stats data was reported. """ if flush_history and opts.poll_loops < 2: - return 0 + return 0, None, None rc = 0 + status_ts = None + hist_ts = None if not flush_history: - rc = get_status_data(opts, gstate, add_item, add_sequence) + rc, status_ts = get_status_data(opts, gstate, add_item, add_sequence) - if opts.history_stats_mode and not rc: - rc = get_history_stats(opts, gstate, add_item, add_sequence, flush_history) + if opts.history_stats_mode and (not rc or opts.poll_loops > 1): + hist_rc, hist_ts = get_history_stats(opts, gstate, add_item, add_sequence, flush_history) + if not rc: + rc = hist_rc if not flush_history and opts.bulk_mode and add_bulk and not rc: rc = get_bulk_data(opts, gstate, add_bulk) - return rc + return rc, status_ts, hist_ts def add_data_normal(data, category, add_item, add_sequence): @@ -237,6 +251,7 @@ def add_data_numeric(data, category, add_item, add_sequence): def get_status_data(opts, gstate, add_item, add_sequence): if opts.satus_mode: + timestamp = int(time.time()) try: groups = starlink_grpc.status_data(context=gstate.context) status_data, obstruct_detail, alert_detail = groups[0:3] @@ -244,13 +259,13 @@ def get_status_data(opts, gstate, add_item, add_sequence): if "status" in opts.mode: if opts.need_id and gstate.dish_id is None: conn_error(opts, "Dish unreachable and ID unknown, so not recording state") - return 1 + return 1, None if opts.verbose: print("Dish unreachable") add_item("state", "DISH_UNREACHABLE", "status") - return 0 + return 0, timestamp conn_error(opts, "Failure getting status: %s", str(e)) - return 1 + return 1, None if opts.need_id: gstate.dish_id = status_data["id"] del status_data["id"] @@ -261,25 +276,28 @@ def get_status_data(opts, gstate, add_item, add_sequence): add_data(obstruct_detail, "status", add_item, add_sequence) if "alert_detail" in opts.mode: add_data(alert_detail, "status", add_item, add_sequence) + return 0, timestamp elif opts.need_id and gstate.dish_id is None: try: gstate.dish_id = starlink_grpc.get_id(context=gstate.context) except starlink_grpc.GrpcError as e: conn_error(opts, "Failure getting dish ID: %s", str(e)) - return 1 + return 1, None if opts.verbose: print("Using dish ID: " + gstate.dish_id) - return 0 + return 0, None def get_history_stats(opts, gstate, add_item, add_sequence, flush_history): """Fetch history stats. See `get_data` for details.""" - if flush_history: + if flush_history or (opts.need_id and gstate.dish_id is None): history = None else: try: + timestamp = int(time.time()) history = starlink_grpc.get_history(context=gstate.context) + gstate.timestamp_stats = timestamp except grpc.RpcError as e: conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e))) history = None @@ -306,17 +324,28 @@ def get_history_stats(opts, gstate, add_item, add_sequence, flush_history): else: gstate.accum_history = history + # When resuming from prior count with --poll-loops set, advance the loop + # count by however many loops worth of data was caught up on. This helps + # avoid abnormally large sample counts in the first set of output data. + if gstate.first_poll and gstate.accum_history: + if opts.poll_loops > 1 and gstate.counter_stats: + new_samples = gstate.accum_history.current - gstate.counter_stats + if new_samples < 0: + new_samples = gstate.accum_history.current + if new_samples > len(gstate.accum_history.pop_ping_drop_rate): + new_samples = len(gstate.accum_history.pop_ping_drop_rate) + gstate.poll_count = max(gstate.poll_count, int((new_samples-1) / opts.loop_interval)) + gstate.first_poll = False + if gstate.poll_count < opts.poll_loops - 1 and not flush_history: gstate.poll_count += 1 - return 0 - - # This can happen if all polling attempts failed. Verbose output has - # already happened, so just return. - if gstate.accum_history is None: - return 0 if flush_history else 1 + return 0, None gstate.poll_count = 0 + if gstate.accum_history is None: + return (0, None) if flush_history else (1, None) + groups = starlink_grpc.history_stats(parse_samples, start=start, verbose=opts.verbose, @@ -337,9 +366,11 @@ def get_history_stats(opts, gstate, add_item, add_sequence, flush_history): if not opts.no_counter: gstate.counter_stats = general["end_counter"] + timestamp = gstate.timestamp_stats + gstate.timestamp_stats = None gstate.accum_history = None - return 0 + return 0, timestamp def get_bulk_data(opts, gstate, add_bulk): diff --git a/dish_grpc_influx.py b/dish_grpc_influx.py index c25c38d..81b2766 100644 --- a/dish_grpc_influx.py +++ b/dish_grpc_influx.py @@ -51,7 +51,8 @@ def handle_sigterm(signum, frame): def parse_args(): - parser = dish_common.create_arg_parser(output_description="write it to an InfluxDB 1.x database") + parser = dish_common.create_arg_parser( + output_description="write it to an InfluxDB 1.x database") group = parser.add_argument_group(title="InfluxDB 1.x database options") group.add_argument("-n", @@ -250,24 +251,24 @@ def loop_body(opts, gstate, shutdown=False): # save off counter value for script restart points[-1]["fields"]["counter"] = counter + count - now = time.time() - rc = dish_common.get_data(opts, - gstate, - cb_add_item, - cb_add_sequence, - add_bulk=cb_add_bulk, - flush_history=shutdown) + rc, status_ts, hist_ts = dish_common.get_data(opts, + gstate, + cb_add_item, + cb_add_sequence, + add_bulk=cb_add_bulk, + flush_history=shutdown) if rc: return rc for category in fields: if fields[category]: + timestamp = status_ts if category == "status" else hist_ts gstate.points.append({ "measurement": "spacex.starlink.user_terminal." + category, "tags": { "id": gstate.dish_id }, - "time": int(now), + "time": timestamp, "fields": fields[category], }) diff --git a/dish_grpc_influx2.py b/dish_grpc_influx2.py index efd482f..ba6f66f 100644 --- a/dish_grpc_influx2.py +++ b/dish_grpc_influx2.py @@ -241,24 +241,24 @@ def loop_body(opts, gstate, shutdown=False): # save off counter value for script restart points[-1]["fields"]["counter"] = counter + count - now = time.time() - rc = dish_common.get_data(opts, - gstate, - cb_add_item, - cb_add_sequence, - add_bulk=cb_add_bulk, - flush_history=shutdown) + rc, status_ts, hist_ts = dish_common.get_data(opts, + gstate, + cb_add_item, + cb_add_sequence, + add_bulk=cb_add_bulk, + flush_history=shutdown) if rc: return rc for category in fields: if fields[category]: + timestamp = status_ts if category == "status" else hist_ts gstate.points.append({ "measurement": "spacex.starlink.user_terminal." + category, "tags": { "id": gstate.dish_id }, - "time": int(now), + "time": timestamp, "fields": fields[category], }) diff --git a/dish_grpc_mqtt.py b/dish_grpc_mqtt.py index 020b23d..203ea91 100644 --- a/dish_grpc_mqtt.py +++ b/dish_grpc_mqtt.py @@ -100,7 +100,7 @@ def loop_body(opts, gstate): ("starlink/dish_{0}/{1}/{2}".format(category, gstate.dish_id, key), ",".join(str(x) for x in val), 0, False)) - rc = dish_common.get_data(opts, gstate, cb_add_item, cb_add_sequence) + rc = dish_common.get_data(opts, gstate, cb_add_item, cb_add_sequence)[0] if msgs: try: diff --git a/dish_grpc_sqlite.py b/dish_grpc_sqlite.py index 9688aaf..53e8499 100644 --- a/dish_grpc_sqlite.py +++ b/dish_grpc_sqlite.py @@ -122,16 +122,20 @@ def loop_body(opts, gstate, shutdown=False): row.append(counter) hist_rows.append(row) - now = int(time.time()) rc = 0 + status_ts = None + hist_ts = None if not shutdown: - rc = dish_common.get_status_data(opts, gstate, cb_add_item, cb_add_sequence) + rc, status_ts = dish_common.get_status_data(opts, gstate, cb_add_item, cb_add_sequence) - if opts.history_stats_mode and not rc: + if opts.history_stats_mode and (not rc or opts.poll_loops > 1): if gstate.counter_stats is None and not opts.skip_query and opts.samples < 0: _, gstate.counter_stats = query_counter(opts, gstate, "end_counter", "ping_stats") - rc = dish_common.get_history_stats(opts, gstate, cb_add_item, cb_add_sequence, shutdown) + hist_rc, hist_ts = dish_common.get_history_stats(opts, gstate, cb_add_item, cb_add_sequence, + shutdown) + if not rc: + rc = hist_rc if not shutdown and opts.bulk_mode and not rc: if gstate.counter is None and not opts.skip_query and opts.bulk_samples < 0: @@ -144,11 +148,12 @@ def loop_body(opts, gstate, shutdown=False): cur = gstate.sql_conn.cursor() for category, fields in tables.items(): if fields: + timestamp = status_ts if category == "status" else hist_ts sql = 'INSERT OR REPLACE INTO "{0}" ("time","id",{1}) VALUES ({2})'.format( category, ",".join('"' + x + '"' for x in fields), ",".join(repeat("?", len(fields) + 2))) - values = [now, gstate.dish_id] + values = [timestamp, gstate.dish_id] values.extend(fields.values()) cur.execute(sql, values) rows_written += 1 diff --git a/dish_grpc_text.py b/dish_grpc_text.py index ef55597..69743be 100644 --- a/dish_grpc_text.py +++ b/dish_grpc_text.py @@ -93,6 +93,13 @@ def parse_args(): if (opts.history_stats_mode or opts.satus_mode) and opts.bulk_mode and not opts.verbose: parser.error("bulk_history cannot be combined with other modes for CSV output") + # Technically possible, but a pain to implement, so just disallow it. User + # probably doesn't realize how weird it would be, anyway, given that stats + # data reports at a different rate from status data in this case. + if opts.history_stats_mode and opts.satus_mode and not opts.verbose and opts.poll_loops > 1: + parser.error("usage of --poll-loops with history stats modes cannot be mixed with status " + "modes for CSV output") + opts.skip_query |= opts.no_counter | opts.verbose if opts.out_file == "-": opts.no_stdout_errors = True @@ -179,10 +186,7 @@ def get_prior_counter(opts, gstate): def loop_body(opts, gstate, print_file, shutdown=False): - if opts.verbose: - csv_data = [] - else: - csv_data = [datetime.utcnow().replace(microsecond=0).isoformat()] + csv_data = [] def xform(val): return "" if val is None else str(val) @@ -224,12 +228,12 @@ def loop_body(opts, gstate, print_file, shutdown=False): fields.extend([xform(val[i]) for val in bulk.values()]) print(",".join(fields), file=print_file) - rc = dish_common.get_data(opts, - gstate, - cb_data_add_item, - cb_data_add_sequence, - add_bulk=cb_add_bulk, - flush_history=shutdown) + rc, status_ts, hist_ts = dish_common.get_data(opts, + gstate, + cb_data_add_item, + cb_data_add_sequence, + add_bulk=cb_add_bulk, + flush_history=shutdown) if opts.verbose: if csv_data: @@ -237,8 +241,9 @@ def loop_body(opts, gstate, print_file, shutdown=False): if opts.loop_interval > 0.0: print(file=print_file) else: - # skip if only timestamp - if len(csv_data) > 1: + if csv_data: + timestamp = status_ts if status_ts is not None else hist_ts + csv_data.insert(0, datetime.utcfromtimestamp(timestamp).isoformat()) print(",".join(csv_data), file=print_file) return rc