Add first pass of bulk data mode

Adds a new option, -b, that will write the individual sample data to InfluxDB server, instead of summary data.
This commit is contained in:
sparky8512 2021-01-17 16:29:56 -08:00
parent 9a57c93d73
commit 0663008be7
2 changed files with 127 additions and 44 deletions

View file

@ -37,7 +37,7 @@ def main():
arg_error = False arg_error = False
try: try:
opts, args = getopt.getopt(sys.argv[1:], "ahn:p:rs:t:vC:D:IP:R:SU:") opts, args = getopt.getopt(sys.argv[1:], "abhn:p:rs:t:vC:D:IP:R:SU:")
except getopt.GetoptError as err: except getopt.GetoptError as err:
print(str(err)) print(str(err))
arg_error = True arg_error = True
@ -49,6 +49,7 @@ def main():
verbose = False verbose = False
default_loop_time = 0 default_loop_time = 0
loop_time = default_loop_time loop_time = default_loop_time
bulk_mode = False
run_lengths = False run_lengths = False
host_default = "localhost" host_default = "localhost"
database_default = "starlinkstats" database_default = "starlinkstats"
@ -92,6 +93,8 @@ def main():
for opt, arg in opts: for opt, arg in opts:
if opt == "-a": if opt == "-a":
samples = -1 samples = -1
elif opt == "-b":
bulk_mode = True
elif opt == "-h": elif opt == "-h":
print_usage = True print_usage = True
elif opt == "-n": elif opt == "-n":
@ -132,6 +135,7 @@ def main():
print("Usage: " + sys.argv[0] + " [options...]") print("Usage: " + sys.argv[0] + " [options...]")
print("Options:") print("Options:")
print(" -a: Parse all valid samples") print(" -a: Parse all valid samples")
print(" -b: Bulk mode: write individual sample data instead of summary stats")
print(" -h: Be helpful") print(" -h: Be helpful")
print(" -n <name>: Hostname of InfluxDB server, default: " + host_default) print(" -n <name>: Hostname of InfluxDB server, default: " + host_default)
print(" -p <num>: Port number to use on InfluxDB server") print(" -p <num>: Port number to use on InfluxDB server")
@ -182,25 +186,28 @@ def main():
return 0 return 0
def loop_body(client): def process_bulk_data():
if gstate.dish_id is None:
try:
gstate.dish_id = starlink_grpc.get_id()
if verbose:
print("Using dish ID: " + gstate.dish_id)
except starlink_grpc.GrpcError as e:
conn_error("Failure getting dish ID: %s", str(e))
return 1
timestamp = datetime.datetime.utcnow() timestamp = datetime.datetime.utcnow()
try: general, bulk = starlink_grpc.history_bulk_data(samples, verbose)
g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose)
except starlink_grpc.GrpcError as e:
conn_error("Failure getting ping stats: %s", str(e))
return 1
all_stats = g_stats.copy() parsed_samples = general["samples"]
for i in range(parsed_samples):
gstate.points.append({
"measurement": "spacex.starlink.user_terminal.history",
"tags": {
"id": gstate.dish_id
},
"time": timestamp + datetime.timedelta(seconds=i - parsed_samples),
"fields": {k: v[i] for k, v in bulk.items()},
})
def process_ping_stats():
timestamp = datetime.datetime.utcnow()
general, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose)
all_stats = general.copy()
all_stats.update(pd_stats) all_stats.update(pd_stats)
if run_lengths: if run_lengths:
for k, v in rl_stats.items(): for k, v in rl_stats.items():
@ -218,6 +225,30 @@ def main():
"time": timestamp, "time": timestamp,
"fields": all_stats, "fields": all_stats,
}) })
def loop_body(client):
if gstate.dish_id is None:
try:
gstate.dish_id = starlink_grpc.get_id()
if verbose:
print("Using dish ID: " + gstate.dish_id)
except starlink_grpc.GrpcError as e:
conn_error("Failure getting dish ID: %s", str(e))
return 1
if bulk_mode:
try:
process_bulk_data()
except starlink_grpc.GrpcError as e:
conn_error("Failure getting history: %s", str(e))
return 1
else:
try:
process_ping_stats()
except starlink_grpc.GrpcError as e:
conn_error("Failure getting ping stats: %s", str(e))
return 1
if verbose: if verbose:
print("Data points queued: " + str(len(gstate.points))) print("Data points queued: " + str(len(gstate.points)))

View file

@ -10,6 +10,11 @@ General statistics:
The sample interval is currently 1 second. The sample interval is currently 1 second.
samples: The number of valid samples analyzed. samples: The number of valid samples analyzed.
current: XXX explain
Bulk history data:
XXX to be written, but it'll be same as some of the items in status info,
just as lists for each.
General ping drop (packet loss) statistics: General ping drop (packet loss) statistics:
This group of statistics characterize the packet loss (labeled "ping drop" This group of statistics characterize the packet loss (labeled "ping drop"
@ -136,6 +141,7 @@ def history_ping_field_names():
""" """
return [ return [
"samples", "samples",
"current",
], [ ], [
"total_ping_drop", "total_ping_drop",
"count_full_ping_drop", "count_full_ping_drop",
@ -165,6 +171,77 @@ def get_history():
return response.dish_get_history return response.dish_get_history
def compute_sample_range(history, parse_samples, verbose=False):
# 'current' is the count of data samples written to the ring buffer,
# irrespective of buffer wrap.
current = int(history.current)
samples = len(history.pop_ping_drop_rate)
if verbose:
print("current counter: " + str(current))
print("All samples: " + str(samples))
samples = min(samples, current)
if verbose:
print("Valid samples: " + str(samples))
# This is ring buffer offset, so both index to oldest data sample and
# index to next data sample after the newest one.
offset = current % samples
if parse_samples < 0 or samples < parse_samples:
parse_samples = samples
# Parse the most recent parse_samples-sized set of samples. This will
# iterate samples in order from oldest to newest.
if parse_samples <= offset:
sample_range = range(offset - parse_samples, offset)
else:
sample_range = chain(range(samples + offset - parse_samples, samples), range(0, offset))
return sample_range, parse_samples, current
def history_bulk_data(parse_samples, verbose=False):
try:
history = get_history()
except grpc.RpcError as e:
raise GrpcError(e)
sample_range, parse_samples, current = compute_sample_range(history, parse_samples, verbose)
pop_ping_drop_rate = []
pop_ping_latency_ms = []
downlink_throughput_bps = []
uplink_throughput_bps = []
snr = []
scheduled = []
obstructed = []
for i in sample_range:
pop_ping_drop_rate.append(history.pop_ping_drop_rate[i])
pop_ping_latency_ms.append(history.pop_ping_latency_ms[i])
downlink_throughput_bps.append(history.downlink_throughput_bps[i])
uplink_throughput_bps.append(history.uplink_throughput_bps[i])
snr.append(history.snr[i])
scheduled.append(history.scheduled[i])
obstructed.append(history.obstructed[i])
return {
"samples": parse_samples,
"current": current,
}, {
"pop_ping_drop_rate": pop_ping_drop_rate,
"pop_ping_latency_ms": pop_ping_latency_ms,
"downlink_throughput_bps": downlink_throughput_bps,
"uplink_throughput_bps": uplink_throughput_bps,
"snr": snr,
"scheduled": scheduled,
"obstructed": obstructed,
}
def history_ping_stats(parse_samples, verbose=False): def history_ping_stats(parse_samples, verbose=False):
"""Fetch, parse, and compute the packet loss stats. """Fetch, parse, and compute the packet loss stats.
@ -187,23 +264,7 @@ def history_ping_stats(parse_samples, verbose=False):
except grpc.RpcError as e: except grpc.RpcError as e:
raise GrpcError(e) raise GrpcError(e)
# 'current' is the count of data samples written to the ring buffer, sample_range, parse_samples, current = compute_sample_range(history, parse_samples, verbose)
# irrespective of buffer wrap.
current = int(history.current)
samples = len(history.pop_ping_drop_rate)
if verbose:
print("current counter: " + str(current))
print("All samples: " + str(samples))
samples = min(samples, current)
if verbose:
print("Valid samples: " + str(samples))
# This is ring buffer offset, so both index to oldest data sample and
# index to next data sample after the newest one.
offset = current % samples
tot = 0.0 tot = 0.0
count_full_drop = 0 count_full_drop = 0
@ -219,16 +280,6 @@ def history_ping_stats(parse_samples, verbose=False):
run_length = 0 run_length = 0
init_run_length = None init_run_length = None
if parse_samples < 0 or samples < parse_samples:
parse_samples = samples
# Parse the most recent parse_samples-sized set of samples. This will
# iterate samples in order from oldest to newest.
if parse_samples <= offset:
sample_range = range(offset - parse_samples, offset)
else:
sample_range = chain(range(samples + offset - parse_samples, samples), range(0, offset))
for i in sample_range: for i in sample_range:
d = history.pop_ping_drop_rate[i] d = history.pop_ping_drop_rate[i]
if d >= 1: if d >= 1:
@ -272,6 +323,7 @@ def history_ping_stats(parse_samples, verbose=False):
return { return {
"samples": parse_samples, "samples": parse_samples,
"current": current,
}, { }, {
"total_ping_drop": tot, "total_ping_drop": tot,
"count_full_ping_drop": count_full_drop, "count_full_ping_drop": count_full_drop,