From 0663008be708b8cc0bfef05952c0a5e3fd0666ea Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Sun, 17 Jan 2021 16:29:56 -0800 Subject: [PATCH] Add first pass of bulk data mode Adds a new option, -b, that will write the individual sample data to InfluxDB server, instead of summary data. --- dishHistoryInflux.py | 65 +++++++++++++++++++------- starlink_grpc.py | 106 ++++++++++++++++++++++++++++++++----------- 2 files changed, 127 insertions(+), 44 deletions(-) diff --git a/dishHistoryInflux.py b/dishHistoryInflux.py index fcecaec..0e1380a 100644 --- a/dishHistoryInflux.py +++ b/dishHistoryInflux.py @@ -37,7 +37,7 @@ def main(): arg_error = False try: - opts, args = getopt.getopt(sys.argv[1:], "ahn:p:rs:t:vC:D:IP:R:SU:") + opts, args = getopt.getopt(sys.argv[1:], "abhn:p:rs:t:vC:D:IP:R:SU:") except getopt.GetoptError as err: print(str(err)) arg_error = True @@ -49,6 +49,7 @@ def main(): verbose = False default_loop_time = 0 loop_time = default_loop_time + bulk_mode = False run_lengths = False host_default = "localhost" database_default = "starlinkstats" @@ -92,6 +93,8 @@ def main(): for opt, arg in opts: if opt == "-a": samples = -1 + elif opt == "-b": + bulk_mode = True elif opt == "-h": print_usage = True elif opt == "-n": @@ -132,6 +135,7 @@ def main(): print("Usage: " + sys.argv[0] + " [options...]") print("Options:") print(" -a: Parse all valid samples") + print(" -b: Bulk mode: write individual sample data instead of summary stats") print(" -h: Be helpful") print(" -n : Hostname of InfluxDB server, default: " + host_default) print(" -p : Port number to use on InfluxDB server") @@ -182,25 +186,28 @@ def main(): return 0 - def loop_body(client): - if gstate.dish_id is None: - try: - gstate.dish_id = starlink_grpc.get_id() - if verbose: - print("Using dish ID: " + gstate.dish_id) - except starlink_grpc.GrpcError as e: - conn_error("Failure getting dish ID: %s", str(e)) - return 1 - + def process_bulk_data(): timestamp = datetime.datetime.utcnow() - try: - g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose) - except starlink_grpc.GrpcError as e: - conn_error("Failure getting ping stats: %s", str(e)) - return 1 + general, bulk = starlink_grpc.history_bulk_data(samples, verbose) - all_stats = g_stats.copy() + parsed_samples = general["samples"] + for i in range(parsed_samples): + gstate.points.append({ + "measurement": "spacex.starlink.user_terminal.history", + "tags": { + "id": gstate.dish_id + }, + "time": timestamp + datetime.timedelta(seconds=i - parsed_samples), + "fields": {k: v[i] for k, v in bulk.items()}, + }) + + def process_ping_stats(): + timestamp = datetime.datetime.utcnow() + + general, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose) + + all_stats = general.copy() all_stats.update(pd_stats) if run_lengths: for k, v in rl_stats.items(): @@ -218,6 +225,30 @@ def main(): "time": timestamp, "fields": all_stats, }) + + def loop_body(client): + if gstate.dish_id is None: + try: + gstate.dish_id = starlink_grpc.get_id() + if verbose: + print("Using dish ID: " + gstate.dish_id) + except starlink_grpc.GrpcError as e: + conn_error("Failure getting dish ID: %s", str(e)) + return 1 + + if bulk_mode: + try: + process_bulk_data() + except starlink_grpc.GrpcError as e: + conn_error("Failure getting history: %s", str(e)) + return 1 + else: + try: + process_ping_stats() + except starlink_grpc.GrpcError as e: + conn_error("Failure getting ping stats: %s", str(e)) + return 1 + if verbose: print("Data points queued: " + str(len(gstate.points))) diff --git a/starlink_grpc.py b/starlink_grpc.py index 40e3572..3586529 100644 --- a/starlink_grpc.py +++ b/starlink_grpc.py @@ -10,6 +10,11 @@ General statistics: The sample interval is currently 1 second. samples: The number of valid samples analyzed. + current: XXX explain + +Bulk history data: + XXX to be written, but it'll be same as some of the items in status info, + just as lists for each. General ping drop (packet loss) statistics: This group of statistics characterize the packet loss (labeled "ping drop" @@ -136,6 +141,7 @@ def history_ping_field_names(): """ return [ "samples", + "current", ], [ "total_ping_drop", "count_full_ping_drop", @@ -165,6 +171,77 @@ def get_history(): return response.dish_get_history +def compute_sample_range(history, parse_samples, verbose=False): + # 'current' is the count of data samples written to the ring buffer, + # irrespective of buffer wrap. + current = int(history.current) + samples = len(history.pop_ping_drop_rate) + + if verbose: + print("current counter: " + str(current)) + print("All samples: " + str(samples)) + + samples = min(samples, current) + + if verbose: + print("Valid samples: " + str(samples)) + + # This is ring buffer offset, so both index to oldest data sample and + # index to next data sample after the newest one. + offset = current % samples + + if parse_samples < 0 or samples < parse_samples: + parse_samples = samples + + # Parse the most recent parse_samples-sized set of samples. This will + # iterate samples in order from oldest to newest. + if parse_samples <= offset: + sample_range = range(offset - parse_samples, offset) + else: + sample_range = chain(range(samples + offset - parse_samples, samples), range(0, offset)) + + return sample_range, parse_samples, current + + +def history_bulk_data(parse_samples, verbose=False): + try: + history = get_history() + except grpc.RpcError as e: + raise GrpcError(e) + + sample_range, parse_samples, current = compute_sample_range(history, parse_samples, verbose) + + pop_ping_drop_rate = [] + pop_ping_latency_ms = [] + downlink_throughput_bps = [] + uplink_throughput_bps = [] + snr = [] + scheduled = [] + obstructed = [] + + for i in sample_range: + pop_ping_drop_rate.append(history.pop_ping_drop_rate[i]) + pop_ping_latency_ms.append(history.pop_ping_latency_ms[i]) + downlink_throughput_bps.append(history.downlink_throughput_bps[i]) + uplink_throughput_bps.append(history.uplink_throughput_bps[i]) + snr.append(history.snr[i]) + scheduled.append(history.scheduled[i]) + obstructed.append(history.obstructed[i]) + + return { + "samples": parse_samples, + "current": current, + }, { + "pop_ping_drop_rate": pop_ping_drop_rate, + "pop_ping_latency_ms": pop_ping_latency_ms, + "downlink_throughput_bps": downlink_throughput_bps, + "uplink_throughput_bps": uplink_throughput_bps, + "snr": snr, + "scheduled": scheduled, + "obstructed": obstructed, + } + + def history_ping_stats(parse_samples, verbose=False): """Fetch, parse, and compute the packet loss stats. @@ -187,23 +264,7 @@ def history_ping_stats(parse_samples, verbose=False): except grpc.RpcError as e: raise GrpcError(e) - # 'current' is the count of data samples written to the ring buffer, - # irrespective of buffer wrap. - current = int(history.current) - samples = len(history.pop_ping_drop_rate) - - if verbose: - print("current counter: " + str(current)) - print("All samples: " + str(samples)) - - samples = min(samples, current) - - if verbose: - print("Valid samples: " + str(samples)) - - # This is ring buffer offset, so both index to oldest data sample and - # index to next data sample after the newest one. - offset = current % samples + sample_range, parse_samples, current = compute_sample_range(history, parse_samples, verbose) tot = 0.0 count_full_drop = 0 @@ -219,16 +280,6 @@ def history_ping_stats(parse_samples, verbose=False): run_length = 0 init_run_length = None - if parse_samples < 0 or samples < parse_samples: - parse_samples = samples - - # Parse the most recent parse_samples-sized set of samples. This will - # iterate samples in order from oldest to newest. - if parse_samples <= offset: - sample_range = range(offset - parse_samples, offset) - else: - sample_range = chain(range(samples + offset - parse_samples, samples), range(0, offset)) - for i in sample_range: d = history.pop_ping_drop_rate[i] if d >= 1: @@ -272,6 +323,7 @@ def history_ping_stats(parse_samples, verbose=False): return { "samples": parse_samples, + "current": current, }, { "total_ping_drop": tot, "count_full_ping_drop": count_full_drop,