diff --git a/dish_common.py b/dish_common.py index 54b658b..ced9f4d 100644 --- a/dish_common.py +++ b/dish_common.py @@ -132,6 +132,9 @@ def run_arg_parser(parser, need_id=False, no_stdout_errors=False): opts.samples = int(opts.loop_interval) if opts.loop_interval >= 1.0 else -1 opts.bulk_samples = -1 else: + # for scripts that query starting history counter, skip it if samples + # was explicitly set + opts.skip_query = True opts.bulk_samples = opts.samples opts.no_stdout_errors = no_stdout_errors @@ -153,15 +156,17 @@ def conn_error(opts, msg, *args): class GlobalState: """A class for keeping state across loop iterations.""" def __init__(self, target=None): - # counter for bulk_history: + # counter, timestamp for bulk_history: self.counter = None - # counter for history stats: - self.counter_stats = None self.timestamp = None + # counter, timestamp for history stats: + self.counter_stats = None + self.timestamp_stats = None self.dish_id = None self.context = starlink_grpc.ChannelContext(target=target) self.poll_count = 0 self.accum_history = None + self.first_poll = True def shutdown(self): self.context.close() @@ -195,23 +200,32 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None, flush_history= would otherwise be lost on script restart. Returns: - 1 if there were any failures getting data from the dish, otherwise 0. + Tuple with 3 values. The first value is 1 if there were any failures + getting data from the dish, otherwise 0. The second value is an int + timestamp for status data (data with category "status"), or None if + no status data was reported. The third value is an int timestamp for + history stats data (non-bulk data with category other than "status"), + or None if no history stats data was reported. """ if flush_history and opts.poll_loops < 2: - return 0 + return 0, None, None rc = 0 + status_ts = None + hist_ts = None if not flush_history: - rc = get_status_data(opts, gstate, add_item, add_sequence) + rc, status_ts = get_status_data(opts, gstate, add_item, add_sequence) - if opts.history_stats_mode and not rc: - rc = get_history_stats(opts, gstate, add_item, add_sequence, flush_history) + if opts.history_stats_mode and (not rc or opts.poll_loops > 1): + hist_rc, hist_ts = get_history_stats(opts, gstate, add_item, add_sequence, flush_history) + if not rc: + rc = hist_rc if not flush_history and opts.bulk_mode and add_bulk and not rc: rc = get_bulk_data(opts, gstate, add_bulk) - return rc + return rc, status_ts, hist_ts def add_data_normal(data, category, add_item, add_sequence): @@ -237,6 +251,7 @@ def add_data_numeric(data, category, add_item, add_sequence): def get_status_data(opts, gstate, add_item, add_sequence): if opts.satus_mode: + timestamp = int(time.time()) try: groups = starlink_grpc.status_data(context=gstate.context) status_data, obstruct_detail, alert_detail = groups[0:3] @@ -244,13 +259,13 @@ def get_status_data(opts, gstate, add_item, add_sequence): if "status" in opts.mode: if opts.need_id and gstate.dish_id is None: conn_error(opts, "Dish unreachable and ID unknown, so not recording state") - return 1 + return 1, None if opts.verbose: print("Dish unreachable") add_item("state", "DISH_UNREACHABLE", "status") - return 0 + return 0, timestamp conn_error(opts, "Failure getting status: %s", str(e)) - return 1 + return 1, None if opts.need_id: gstate.dish_id = status_data["id"] del status_data["id"] @@ -261,25 +276,28 @@ def get_status_data(opts, gstate, add_item, add_sequence): add_data(obstruct_detail, "status", add_item, add_sequence) if "alert_detail" in opts.mode: add_data(alert_detail, "status", add_item, add_sequence) + return 0, timestamp elif opts.need_id and gstate.dish_id is None: try: gstate.dish_id = starlink_grpc.get_id(context=gstate.context) except starlink_grpc.GrpcError as e: conn_error(opts, "Failure getting dish ID: %s", str(e)) - return 1 + return 1, None if opts.verbose: print("Using dish ID: " + gstate.dish_id) - return 0 + return 0, None def get_history_stats(opts, gstate, add_item, add_sequence, flush_history): """Fetch history stats. See `get_data` for details.""" - if flush_history: + if flush_history or (opts.need_id and gstate.dish_id is None): history = None else: try: + timestamp = int(time.time()) history = starlink_grpc.get_history(context=gstate.context) + gstate.timestamp_stats = timestamp except grpc.RpcError as e: conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e))) history = None @@ -306,17 +324,28 @@ def get_history_stats(opts, gstate, add_item, add_sequence, flush_history): else: gstate.accum_history = history + # When resuming from prior count with --poll-loops set, advance the loop + # count by however many loops worth of data was caught up on. This helps + # avoid abnormally large sample counts in the first set of output data. + if gstate.first_poll and gstate.accum_history: + if opts.poll_loops > 1 and gstate.counter_stats: + new_samples = gstate.accum_history.current - gstate.counter_stats + if new_samples < 0: + new_samples = gstate.accum_history.current + if new_samples > len(gstate.accum_history.pop_ping_drop_rate): + new_samples = len(gstate.accum_history.pop_ping_drop_rate) + gstate.poll_count = max(gstate.poll_count, int((new_samples-1) / opts.loop_interval)) + gstate.first_poll = False + if gstate.poll_count < opts.poll_loops - 1 and not flush_history: gstate.poll_count += 1 - return 0 - - # This can happen if all polling attempts failed. Verbose output has - # already happened, so just return. - if gstate.accum_history is None: - return 0 if flush_history else 1 + return 0, None gstate.poll_count = 0 + if gstate.accum_history is None: + return (0, None) if flush_history else (1, None) + groups = starlink_grpc.history_stats(parse_samples, start=start, verbose=opts.verbose, @@ -337,9 +366,11 @@ def get_history_stats(opts, gstate, add_item, add_sequence, flush_history): if not opts.no_counter: gstate.counter_stats = general["end_counter"] + timestamp = gstate.timestamp_stats + gstate.timestamp_stats = None gstate.accum_history = None - return 0 + return 0, timestamp def get_bulk_data(opts, gstate, add_bulk): diff --git a/dish_grpc_influx.py b/dish_grpc_influx.py index c25c38d..81b2766 100644 --- a/dish_grpc_influx.py +++ b/dish_grpc_influx.py @@ -51,7 +51,8 @@ def handle_sigterm(signum, frame): def parse_args(): - parser = dish_common.create_arg_parser(output_description="write it to an InfluxDB 1.x database") + parser = dish_common.create_arg_parser( + output_description="write it to an InfluxDB 1.x database") group = parser.add_argument_group(title="InfluxDB 1.x database options") group.add_argument("-n", @@ -250,24 +251,24 @@ def loop_body(opts, gstate, shutdown=False): # save off counter value for script restart points[-1]["fields"]["counter"] = counter + count - now = time.time() - rc = dish_common.get_data(opts, - gstate, - cb_add_item, - cb_add_sequence, - add_bulk=cb_add_bulk, - flush_history=shutdown) + rc, status_ts, hist_ts = dish_common.get_data(opts, + gstate, + cb_add_item, + cb_add_sequence, + add_bulk=cb_add_bulk, + flush_history=shutdown) if rc: return rc for category in fields: if fields[category]: + timestamp = status_ts if category == "status" else hist_ts gstate.points.append({ "measurement": "spacex.starlink.user_terminal." + category, "tags": { "id": gstate.dish_id }, - "time": int(now), + "time": timestamp, "fields": fields[category], }) diff --git a/dish_grpc_influx2.py b/dish_grpc_influx2.py index efd482f..ba6f66f 100644 --- a/dish_grpc_influx2.py +++ b/dish_grpc_influx2.py @@ -241,24 +241,24 @@ def loop_body(opts, gstate, shutdown=False): # save off counter value for script restart points[-1]["fields"]["counter"] = counter + count - now = time.time() - rc = dish_common.get_data(opts, - gstate, - cb_add_item, - cb_add_sequence, - add_bulk=cb_add_bulk, - flush_history=shutdown) + rc, status_ts, hist_ts = dish_common.get_data(opts, + gstate, + cb_add_item, + cb_add_sequence, + add_bulk=cb_add_bulk, + flush_history=shutdown) if rc: return rc for category in fields: if fields[category]: + timestamp = status_ts if category == "status" else hist_ts gstate.points.append({ "measurement": "spacex.starlink.user_terminal." + category, "tags": { "id": gstate.dish_id }, - "time": int(now), + "time": timestamp, "fields": fields[category], }) diff --git a/dish_grpc_mqtt.py b/dish_grpc_mqtt.py index 020b23d..203ea91 100644 --- a/dish_grpc_mqtt.py +++ b/dish_grpc_mqtt.py @@ -100,7 +100,7 @@ def loop_body(opts, gstate): ("starlink/dish_{0}/{1}/{2}".format(category, gstate.dish_id, key), ",".join(str(x) for x in val), 0, False)) - rc = dish_common.get_data(opts, gstate, cb_add_item, cb_add_sequence) + rc = dish_common.get_data(opts, gstate, cb_add_item, cb_add_sequence)[0] if msgs: try: diff --git a/dish_grpc_sqlite.py b/dish_grpc_sqlite.py index 9688aaf..53e8499 100644 --- a/dish_grpc_sqlite.py +++ b/dish_grpc_sqlite.py @@ -122,16 +122,20 @@ def loop_body(opts, gstate, shutdown=False): row.append(counter) hist_rows.append(row) - now = int(time.time()) rc = 0 + status_ts = None + hist_ts = None if not shutdown: - rc = dish_common.get_status_data(opts, gstate, cb_add_item, cb_add_sequence) + rc, status_ts = dish_common.get_status_data(opts, gstate, cb_add_item, cb_add_sequence) - if opts.history_stats_mode and not rc: + if opts.history_stats_mode and (not rc or opts.poll_loops > 1): if gstate.counter_stats is None and not opts.skip_query and opts.samples < 0: _, gstate.counter_stats = query_counter(opts, gstate, "end_counter", "ping_stats") - rc = dish_common.get_history_stats(opts, gstate, cb_add_item, cb_add_sequence, shutdown) + hist_rc, hist_ts = dish_common.get_history_stats(opts, gstate, cb_add_item, cb_add_sequence, + shutdown) + if not rc: + rc = hist_rc if not shutdown and opts.bulk_mode and not rc: if gstate.counter is None and not opts.skip_query and opts.bulk_samples < 0: @@ -144,11 +148,12 @@ def loop_body(opts, gstate, shutdown=False): cur = gstate.sql_conn.cursor() for category, fields in tables.items(): if fields: + timestamp = status_ts if category == "status" else hist_ts sql = 'INSERT OR REPLACE INTO "{0}" ("time","id",{1}) VALUES ({2})'.format( category, ",".join('"' + x + '"' for x in fields), ",".join(repeat("?", len(fields) + 2))) - values = [now, gstate.dish_id] + values = [timestamp, gstate.dish_id] values.extend(fields.values()) cur.execute(sql, values) rows_written += 1 diff --git a/dish_grpc_text.py b/dish_grpc_text.py index ef55597..69743be 100644 --- a/dish_grpc_text.py +++ b/dish_grpc_text.py @@ -93,6 +93,13 @@ def parse_args(): if (opts.history_stats_mode or opts.satus_mode) and opts.bulk_mode and not opts.verbose: parser.error("bulk_history cannot be combined with other modes for CSV output") + # Technically possible, but a pain to implement, so just disallow it. User + # probably doesn't realize how weird it would be, anyway, given that stats + # data reports at a different rate from status data in this case. + if opts.history_stats_mode and opts.satus_mode and not opts.verbose and opts.poll_loops > 1: + parser.error("usage of --poll-loops with history stats modes cannot be mixed with status " + "modes for CSV output") + opts.skip_query |= opts.no_counter | opts.verbose if opts.out_file == "-": opts.no_stdout_errors = True @@ -179,10 +186,7 @@ def get_prior_counter(opts, gstate): def loop_body(opts, gstate, print_file, shutdown=False): - if opts.verbose: - csv_data = [] - else: - csv_data = [datetime.utcnow().replace(microsecond=0).isoformat()] + csv_data = [] def xform(val): return "" if val is None else str(val) @@ -224,12 +228,12 @@ def loop_body(opts, gstate, print_file, shutdown=False): fields.extend([xform(val[i]) for val in bulk.values()]) print(",".join(fields), file=print_file) - rc = dish_common.get_data(opts, - gstate, - cb_data_add_item, - cb_data_add_sequence, - add_bulk=cb_add_bulk, - flush_history=shutdown) + rc, status_ts, hist_ts = dish_common.get_data(opts, + gstate, + cb_data_add_item, + cb_data_add_sequence, + add_bulk=cb_add_bulk, + flush_history=shutdown) if opts.verbose: if csv_data: @@ -237,8 +241,9 @@ def loop_body(opts, gstate, print_file, shutdown=False): if opts.loop_interval > 0.0: print(file=print_file) else: - # skip if only timestamp - if len(csv_data) > 1: + if csv_data: + timestamp = status_ts if status_ts is not None else hist_ts + csv_data.insert(0, datetime.utcfromtimestamp(timestamp).isoformat()) print(",".join(csv_data), file=print_file) return rc