Resume from last counter for history stats
Currently only implemented for sqlite, since with InfluxDB, there is the complication that the InfluxDB server may not be available to query at script start time. Also, it only applies when polling all samples, which is not the default, and even then can be disabled with either --skip-query or --no-counter options. Remove the crontab instructions from the README, since the periodic loop functionality is probably now a better approach for periodic recording of stats data.
This commit is contained in:
parent
206bbbf919
commit
be776cde1c
3 changed files with 49 additions and 44 deletions
11
README.md
11
README.md
|
@ -65,11 +65,6 @@ python3 dish_grpc_text.py status obstruction_detail alert_detail
|
||||||
|
|
||||||
By default, `dish_grpc_text.py` (and `dish_json_text.py`, described below) will output in CSV format. You can use the `-v` option to instead output in a (slightly) more human-readable format.
|
By default, `dish_grpc_text.py` (and `dish_json_text.py`, described below) will output in CSV format. You can use the `-v` option to instead output in a (slightly) more human-readable format.
|
||||||
|
|
||||||
To collect and record packet loss summary stats at the top of every hour, you could put something like the following in your user crontab (assuming you have moved the scripts to ~/bin and made them executable):
|
|
||||||
```
|
|
||||||
00 * * * * [ -e ~/dishStats.csv ] || ~/bin/dish_grpc_text.py -H >~/dishStats.csv; ~/bin/dish_grpc_text.py ping_drop >>~/dishStats.csv
|
|
||||||
```
|
|
||||||
|
|
||||||
By default, all of these scripts will pull data once, send it off to the specified data backend, and then exit. They can instead be made to run in a periodic loop by passing a `-t` option to specify loop interval, in seconds. For example, to capture status information to a InfluxDB server every 30 seconds, you could do something like this:
|
By default, all of these scripts will pull data once, send it off to the specified data backend, and then exit. They can instead be made to run in a periodic loop by passing a `-t` option to specify loop interval, in seconds. For example, to capture status information to a InfluxDB server every 30 seconds, you could do something like this:
|
||||||
```shell script
|
```shell script
|
||||||
python3 dish_grpc_influx.py -t 30 [... probably other args to specify server options ...] status
|
python3 dish_grpc_influx.py -t 30 [... probably other args to specify server options ...] status
|
||||||
|
@ -131,11 +126,9 @@ You'll probably want to run with the `-t` option to `dish_grpc_influx.py` to col
|
||||||
|
|
||||||
## To Be Done (Maybe)
|
## To Be Done (Maybe)
|
||||||
|
|
||||||
Maybe more data backend options. If there's one you'd like to see supported, please open a feature request issue.
|
|
||||||
|
|
||||||
There are `reboot` and `dish_stow` requests in the Device protocol, too, so it should be trivial to write a command that initiates dish reboot and stow operations. These are easy enough to do with `grpcurl`, though, as there is no need to parse through the response data. For that matter, they're easy enough to do with the Starlink app.
|
There are `reboot` and `dish_stow` requests in the Device protocol, too, so it should be trivial to write a command that initiates dish reboot and stow operations. These are easy enough to do with `grpcurl`, though, as there is no need to parse through the response data. For that matter, they're easy enough to do with the Starlink app.
|
||||||
|
|
||||||
Proper Python packaging, since the dependency list keeps growing....
|
No further data collection functionality is planned at this time. If there's something you'd like to see added, please feel free to open a feature request issue. Bear in mind, though, that functionality will be limited to that which the Starlink gRPC services support. In general, those services are limited to what is required by the Starlink app, so unless the app has some related feature, it is unlikely the gRPC services will be sufficient to implement it in these tools.
|
||||||
|
|
||||||
## Other Tidbits
|
## Other Tidbits
|
||||||
|
|
||||||
|
@ -143,7 +136,7 @@ The Starlink Android app actually uses port 9201 instead of 9200. Both appear to
|
||||||
|
|
||||||
The Starlink router also exposes a gRPC service, on ports 9000 (HTTP/2.0) and 9001 (HTTP/1.1).
|
The Starlink router also exposes a gRPC service, on ports 9000 (HTTP/2.0) and 9001 (HTTP/1.1).
|
||||||
|
|
||||||
The file `get_history_notes.txt` has my original ramblings on how to interpret the history buffer data (with the JSON format naming). It may be of interest if you're interested in pulling the `get_history` grpc data directly and don't want to dig through the convoluted logic in the `starlink-grpc` module.
|
The file `get_history_notes.txt` has my original ramblings on how to interpret the history buffer data (with the JSON format naming). It may be of interest if you want to pull the `get_history` grpc data directly and don't want to dig through the convoluted logic in the `starlink_grpc` module.
|
||||||
|
|
||||||
## Related Projects
|
## Related Projects
|
||||||
|
|
||||||
|
|
|
@ -190,14 +190,27 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None):
|
||||||
Returns:
|
Returns:
|
||||||
1 if there were any failures getting data from the dish, otherwise 0.
|
1 if there were any failures getting data from the dish, otherwise 0.
|
||||||
"""
|
"""
|
||||||
def add_data(data, category):
|
rc = get_status_data(opts, gstate, add_item, add_sequence)
|
||||||
for key, val in data.items():
|
|
||||||
name, start, seq = BRACKETS_RE.match(key).group(1, 4, 5)
|
|
||||||
if seq is None:
|
|
||||||
add_item(name, val, category)
|
|
||||||
else:
|
|
||||||
add_sequence(name, val, category, int(start) if start else 0)
|
|
||||||
|
|
||||||
|
if opts.history_stats_mode and not rc:
|
||||||
|
rc = get_history_stats(opts, gstate, add_item, add_sequence)
|
||||||
|
|
||||||
|
if opts.bulk_mode and add_bulk and not rc:
|
||||||
|
rc = get_bulk_data(opts, gstate, add_bulk)
|
||||||
|
|
||||||
|
return rc
|
||||||
|
|
||||||
|
|
||||||
|
def add_data(data, category, add_item, add_sequence):
|
||||||
|
for key, val in data.items():
|
||||||
|
name, start, seq = BRACKETS_RE.match(key).group(1, 4, 5)
|
||||||
|
if seq is None:
|
||||||
|
add_item(name, val, category)
|
||||||
|
else:
|
||||||
|
add_sequence(name, val, category, int(start) if start else 0)
|
||||||
|
|
||||||
|
|
||||||
|
def get_status_data(opts, gstate, add_item, add_sequence):
|
||||||
if opts.satus_mode:
|
if opts.satus_mode:
|
||||||
try:
|
try:
|
||||||
groups = starlink_grpc.status_data(context=gstate.context)
|
groups = starlink_grpc.status_data(context=gstate.context)
|
||||||
|
@ -217,11 +230,11 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None):
|
||||||
gstate.dish_id = status_data["id"]
|
gstate.dish_id = status_data["id"]
|
||||||
del status_data["id"]
|
del status_data["id"]
|
||||||
if "status" in opts.mode:
|
if "status" in opts.mode:
|
||||||
add_data(status_data, "status")
|
add_data(status_data, "status", add_item, add_sequence)
|
||||||
if "obstruction_detail" in opts.mode:
|
if "obstruction_detail" in opts.mode:
|
||||||
add_data(obstruct_detail, "status")
|
add_data(obstruct_detail, "status", add_item, add_sequence)
|
||||||
if "alert_detail" in opts.mode:
|
if "alert_detail" in opts.mode:
|
||||||
add_data(alert_detail, "status")
|
add_data(alert_detail, "status", add_item, add_sequence)
|
||||||
elif opts.need_id and gstate.dish_id is None:
|
elif opts.need_id and gstate.dish_id is None:
|
||||||
try:
|
try:
|
||||||
gstate.dish_id = starlink_grpc.get_id(context=gstate.context)
|
gstate.dish_id = starlink_grpc.get_id(context=gstate.context)
|
||||||
|
@ -231,9 +244,11 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None):
|
||||||
if opts.verbose:
|
if opts.verbose:
|
||||||
print("Using dish ID: " + gstate.dish_id)
|
print("Using dish ID: " + gstate.dish_id)
|
||||||
|
|
||||||
if not opts.history_stats_mode and not (opts.bulk_mode and add_bulk):
|
return 0
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
|
def get_history_stats(opts, gstate, add_item, add_sequence):
|
||||||
|
"""Fetch history stats. See `get_data` for details."""
|
||||||
try:
|
try:
|
||||||
history = starlink_grpc.get_history(context=gstate.context)
|
history = starlink_grpc.get_history(context=gstate.context)
|
||||||
except grpc.RpcError as e:
|
except grpc.RpcError as e:
|
||||||
|
@ -242,17 +257,6 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None):
|
||||||
if history is None:
|
if history is None:
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
if opts.history_stats_mode:
|
|
||||||
get_history_stats(opts, gstate, add_data, history)
|
|
||||||
|
|
||||||
if opts.bulk_mode and add_bulk:
|
|
||||||
return get_bulk_data(opts, gstate, add_bulk)
|
|
||||||
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
def get_history_stats(opts, gstate, add_data, history):
|
|
||||||
"""Fetch history stats. See `get_data` for details."""
|
|
||||||
if history and gstate.prev_history and history.current < gstate.prev_history.current:
|
if history and gstate.prev_history and history.current < gstate.prev_history.current:
|
||||||
if opts.verbose:
|
if opts.verbose:
|
||||||
print("Dish reboot detected. Restarting loop polling count.")
|
print("Dish reboot detected. Restarting loop polling count.")
|
||||||
|
@ -277,20 +281,22 @@ def get_history_stats(opts, gstate, add_data, history):
|
||||||
verbose=opts.verbose,
|
verbose=opts.verbose,
|
||||||
history=history)
|
history=history)
|
||||||
general, ping, runlen, latency, loaded, usage = groups[0:6]
|
general, ping, runlen, latency, loaded, usage = groups[0:6]
|
||||||
add_data(general, "ping_stats")
|
add_data(general, "ping_stats", add_item, add_sequence)
|
||||||
if "ping_drop" in opts.mode:
|
if "ping_drop" in opts.mode:
|
||||||
add_data(ping, "ping_stats")
|
add_data(ping, "ping_stats", add_item, add_sequence)
|
||||||
if "ping_run_length" in opts.mode:
|
if "ping_run_length" in opts.mode:
|
||||||
add_data(runlen, "ping_stats")
|
add_data(runlen, "ping_stats", add_item, add_sequence)
|
||||||
if "ping_latency" in opts.mode:
|
if "ping_latency" in opts.mode:
|
||||||
add_data(latency, "ping_stats")
|
add_data(latency, "ping_stats", add_item, add_sequence)
|
||||||
if "ping_loaded_latency" in opts.mode:
|
if "ping_loaded_latency" in opts.mode:
|
||||||
add_data(loaded, "ping_stats")
|
add_data(loaded, "ping_stats", add_item, add_sequence)
|
||||||
if "usage" in opts.mode:
|
if "usage" in opts.mode:
|
||||||
add_data(usage, "usage")
|
add_data(usage, "usage", add_item, add_sequence)
|
||||||
if not opts.no_counter:
|
if not opts.no_counter:
|
||||||
gstate.counter_stats = general["end_counter"]
|
gstate.counter_stats = general["end_counter"]
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
def get_bulk_data(opts, gstate, add_bulk):
|
def get_bulk_data(opts, gstate, add_bulk):
|
||||||
"""Fetch bulk data. See `get_data` for details."""
|
"""Fetch bulk data. See `get_data` for details."""
|
||||||
|
|
|
@ -69,15 +69,17 @@ def parse_args():
|
||||||
|
|
||||||
opts = dish_common.run_arg_parser(parser, need_id=True)
|
opts = dish_common.run_arg_parser(parser, need_id=True)
|
||||||
|
|
||||||
|
opts.skip_query |= opts.no_counter
|
||||||
|
|
||||||
return opts
|
return opts
|
||||||
|
|
||||||
|
|
||||||
def query_counter(opts, gstate):
|
def query_counter(opts, gstate, column, table):
|
||||||
now = time.time()
|
now = time.time()
|
||||||
cur = gstate.sql_conn.cursor()
|
cur = gstate.sql_conn.cursor()
|
||||||
cur.execute(
|
cur.execute(
|
||||||
'SELECT "time", "counter" FROM "history" WHERE "time"<? AND "id"=? '
|
'SELECT "time", "{0}" FROM "{1}" WHERE "time"<? AND "id"=? '
|
||||||
'ORDER BY "time" DESC LIMIT 1', (now, gstate.dish_id))
|
'ORDER BY "time" DESC LIMIT 1'.format(column, table), (now, gstate.dish_id))
|
||||||
row = cur.fetchone()
|
row = cur.fetchone()
|
||||||
cur.close()
|
cur.close()
|
||||||
|
|
||||||
|
@ -102,7 +104,6 @@ def loop_body(opts, gstate):
|
||||||
tables[category][key] = ",".join(str(subv) if subv is not None else "" for subv in val)
|
tables[category][key] = ",".join(str(subv) if subv is not None else "" for subv in val)
|
||||||
|
|
||||||
def cb_add_bulk(bulk, count, timestamp, counter):
|
def cb_add_bulk(bulk, count, timestamp, counter):
|
||||||
nonlocal hist_cols
|
|
||||||
if len(hist_cols) == 2:
|
if len(hist_cols) == 2:
|
||||||
hist_cols.extend(bulk.keys())
|
hist_cols.extend(bulk.keys())
|
||||||
hist_cols.append("counter")
|
hist_cols.append("counter")
|
||||||
|
@ -115,11 +116,16 @@ def loop_body(opts, gstate):
|
||||||
hist_rows.append(row)
|
hist_rows.append(row)
|
||||||
|
|
||||||
now = int(time.time())
|
now = int(time.time())
|
||||||
rc = dish_common.get_data(opts, gstate, cb_add_item, cb_add_sequence)
|
rc = dish_common.get_status_data(opts, gstate, cb_add_item, cb_add_sequence)
|
||||||
|
|
||||||
|
if opts.history_stats_mode and not rc:
|
||||||
|
if gstate.counter_stats is None and not opts.skip_query and opts.samples < 0:
|
||||||
|
_, gstate.counter_stats = query_counter(opts, gstate, "end_counter", "ping_stats")
|
||||||
|
rc = dish_common.get_history_stats(opts, gstate, cb_add_item, cb_add_sequence)
|
||||||
|
|
||||||
if opts.bulk_mode and not rc:
|
if opts.bulk_mode and not rc:
|
||||||
if gstate.counter is None and not opts.skip_query and opts.bulk_samples < 0:
|
if gstate.counter is None and not opts.skip_query and opts.bulk_samples < 0:
|
||||||
gstate.timestamp, gstate.counter = query_counter(opts, gstate)
|
gstate.timestamp, gstate.counter = query_counter(opts, gstate, "counter", "history")
|
||||||
rc = dish_common.get_bulk_data(opts, gstate, cb_add_bulk)
|
rc = dish_common.get_bulk_data(opts, gstate, cb_add_bulk)
|
||||||
|
|
||||||
rows_written = 0
|
rows_written = 0
|
||||||
|
|
Loading…
Reference in a new issue