initial commit influxDB 2.0 client
This commit is contained in:
parent
859dc84b88
commit
5905fb00f9
1 changed files with 330 additions and 0 deletions
330
dish_grpc_influx2.py
Normal file
330
dish_grpc_influx2.py
Normal file
|
@ -0,0 +1,330 @@
|
|||
#!/usr/bin/python3
|
||||
"""Write Starlink user terminal data to an InfluxDB 2.x database.
|
||||
|
||||
This script pulls the current status info and/or metrics computed from the
|
||||
history data and writes them to the specified InfluxDB 2.x database either once
|
||||
or in a periodic loop.
|
||||
|
||||
Data will be written into the requested database with the following
|
||||
measurement / series names:
|
||||
|
||||
: spacex.starlink.user_terminal.status : Current status data
|
||||
: spacex.starlink.user_terminal.history : Bulk history data
|
||||
: spacex.starlink.user_terminal.ping_stats : Ping history statistics
|
||||
: spacex.starlink.user_terminal.usage : Usage history statistics
|
||||
|
||||
NOTE: The Starlink user terminal does not include time values with its
|
||||
history or status data, so this script uses current system time to compute
|
||||
the timestamps it sends to InfluxDB. It is recommended to run this script on
|
||||
a host that has its system clock synced via NTP. Otherwise, the timestamps
|
||||
may get out of sync with real time.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import timezone
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
import warnings
|
||||
|
||||
from influxdb_client import InfluxDBClient, WriteOptions, WritePrecision
|
||||
|
||||
import dish_common
|
||||
|
||||
HOST_URL = "localhost:8086"
|
||||
BUCKET_DEFAULT = "starlinkstats"
|
||||
BULK_MEASUREMENT = "spacex.starlink.user_terminal.history"
|
||||
FLUSH_LIMIT = 6
|
||||
MAX_BATCH = 5000
|
||||
MAX_QUEUE_LENGTH = 864000
|
||||
|
||||
|
||||
class Terminated(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def handle_sigterm(signum, frame):
|
||||
# Turn SIGTERM into an exception so main loop can clean up
|
||||
raise Terminated
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = dish_common.create_arg_parser(output_description="write it to an InfluxDB 2.0 database")
|
||||
|
||||
group = parser.add_argument_group(title="InfluxDB 2.0 database options")
|
||||
group.add_argument("-u",
|
||||
"--url",
|
||||
default=HOST_URL,
|
||||
dest="url",
|
||||
help="URL or IP AND Port of the InfluxDB 2.0 Host, default: " + HOST_URL)
|
||||
group.add_argument("-T", "--token", help="Token to access the bucket")
|
||||
group.add_argument("-B",
|
||||
"--bucket",
|
||||
default=BUCKET_DEFAULT,
|
||||
help="Bucket name to use, default: " + BUCKET_DEFAULT)
|
||||
group.add_argument("-O", "--org", help="Organisation name")
|
||||
group.add_argument("-k",
|
||||
"--skip-query",
|
||||
action="store_true",
|
||||
help="Skip querying for prior sample write point in bulk mode")
|
||||
group.add_argument("-C",
|
||||
"--ca-cert",
|
||||
dest="verify_ssl",
|
||||
help="Enable SSL/TLS using specified CA cert to verify broker",
|
||||
metavar="FILENAME")
|
||||
group.add_argument("-I",
|
||||
"--insecure",
|
||||
action="store_false",
|
||||
dest="verify_ssl",
|
||||
help="Enable SSL/TLS but disable certificate verification (INSECURE!)")
|
||||
group.add_argument("-S",
|
||||
"--secure",
|
||||
action="store_true",
|
||||
dest="verify_ssl",
|
||||
help="Enable SSL/TLS using default CA cert")
|
||||
|
||||
env_map = (
|
||||
("INFLUXDB_URL", "url"),
|
||||
("INFLUXDB_TOKEN", "token"),
|
||||
("INFLUXDB_Bucket", "bucket"),
|
||||
("INFLUXDB_ORG", "org"),
|
||||
("INFLUXDB_SSL", "verify_ssl"),
|
||||
)
|
||||
env_defaults = {}
|
||||
for var, opt in env_map:
|
||||
# check both set and not empty string
|
||||
val = os.environ.get(var)
|
||||
if val:
|
||||
if var == "INFLUXDB_SSL" and val == "secure":
|
||||
env_defaults[opt] = True
|
||||
elif var == "INFLUXDB_SSL" and val == "insecure":
|
||||
env_defaults[opt] = False
|
||||
else:
|
||||
env_defaults[opt] = val
|
||||
parser.set_defaults(**env_defaults)
|
||||
|
||||
opts = dish_common.run_arg_parser(parser, need_id=True)
|
||||
|
||||
opts.icargs = {}
|
||||
for key in ["url", "token", "bucket", "org", "verify_ssl"]:
|
||||
val = getattr(opts, key)
|
||||
if val is not None:
|
||||
opts.icargs[key] = val
|
||||
|
||||
if opts.verify_ssl is not None:
|
||||
opts.icargs["ssl"] = True
|
||||
|
||||
return opts
|
||||
|
||||
|
||||
def flush_points(opts, gstate):
|
||||
try:
|
||||
write_api = gstate.influx_client.write_api(write_options=WriteOptions(batch_size=len(gstate.points),
|
||||
flush_interval=10_000,
|
||||
jitter_interval=2_000,
|
||||
retry_interval=5_000,
|
||||
max_retries=5,
|
||||
max_retry_delay=30_000,
|
||||
exponential_base=2))
|
||||
while len(gstate.points) > MAX_BATCH:
|
||||
write_api.write(record=gstate.points[:MAX_BATCH], write_precision=WritePrecision.S, bucket=opts.bucket)
|
||||
if opts.verbose:
|
||||
print("Data points written: " + str(MAX_BATCH))
|
||||
del gstate.points[:MAX_BATCH]
|
||||
|
||||
if gstate.points:
|
||||
write_api.write(record=gstate.points, write_precision=WritePrecision.S, bucket=opts.bucket)
|
||||
if opts.verbose:
|
||||
print("Data points written: " + str(len(gstate.points)))
|
||||
gstate.points.clear()
|
||||
write_api.flush()
|
||||
write_api.close()
|
||||
except Exception as e:
|
||||
dish_common.conn_error(opts, "Failed writing to InfluxDB database: %s", str(e))
|
||||
# If failures persist, don't just use infinite memory. Max queue
|
||||
# is currently 10 days of bulk data, so something is very wrong
|
||||
# if it's ever exceeded.
|
||||
if len(gstate.points) > MAX_QUEUE_LENGTH:
|
||||
logging.error("Max write queue exceeded, discarding data.")
|
||||
del gstate.points[:-MAX_QUEUE_LENGTH]
|
||||
return 1
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def query_counter(opts, gstate, start, end):
|
||||
try:
|
||||
query_api = gstate.influx_client.query_api()
|
||||
print(str(opts.bucket) + ' ' + str(start) + " " + str(end))
|
||||
result = query_api.query('''
|
||||
from(bucket: "{0}")
|
||||
|> range(start: {1}, stop: {2})
|
||||
|> filter(fn: (r) => r["_measurement"] == "{3}")
|
||||
|> filter(fn: (r) => r["_field"] == "counter")
|
||||
|> last()
|
||||
|> yield(name: "last")
|
||||
'''.format(opts.bucket, str(start), str(end), BULK_MEASUREMENT)
|
||||
)
|
||||
if result:
|
||||
counter = result[0].records[0]['_value']
|
||||
timestamp = result[0].records[0]['_time']
|
||||
timestamp = time.mktime(timestamp.timetuple())
|
||||
if counter and timestamp:
|
||||
return int(counter), int(timestamp)
|
||||
except TypeError as e:
|
||||
logging.error(
|
||||
"Skipping resumption from prior counter value. Reported error was: %s", str(e))
|
||||
|
||||
return None, 0
|
||||
|
||||
|
||||
def sync_timebase(opts, gstate):
|
||||
try:
|
||||
db_counter, db_timestamp = query_counter(opts, gstate, gstate.start_timestamp, gstate.timestamp)
|
||||
except Exception as e:
|
||||
# could be temporary outage, so try again next time
|
||||
dish_common.conn_error(opts, "Failed querying InfluxDB for prior count: %s", str(e))
|
||||
return
|
||||
gstate.timebase_synced = True
|
||||
|
||||
if db_counter and gstate.start_counter <= db_counter:
|
||||
del gstate.deferred_points[:db_counter - gstate.start_counter]
|
||||
if gstate.deferred_points:
|
||||
delta_timestamp = db_timestamp - (gstate.deferred_points[0]["time"] - 1)
|
||||
# to prevent +/- 1 second timestamp drift when the script restarts,
|
||||
# if time base is within 2 seconds of that of the last sample in
|
||||
# the database, correct back to that time base
|
||||
if delta_timestamp == 0:
|
||||
if opts.verbose:
|
||||
print("Exactly synced with database time base")
|
||||
elif -2 <= delta_timestamp <= 2:
|
||||
if opts.verbose:
|
||||
print("Replacing with existing time base: {0} -> {1}".format(
|
||||
db_counter, datetime.fromtimestamp(db_timestamp, tz=timezone.utc)))
|
||||
for point in gstate.deferred_points:
|
||||
db_timestamp += 1
|
||||
if point["time"] + delta_timestamp == db_timestamp:
|
||||
point["time"] = db_timestamp
|
||||
else:
|
||||
# lost time sync when recording data, leave the rest
|
||||
break
|
||||
else:
|
||||
gstate.timestamp = db_timestamp
|
||||
else:
|
||||
if opts.verbose:
|
||||
print("Database time base out of sync by {0} seconds".format(delta_timestamp))
|
||||
|
||||
gstate.points.extend(gstate.deferred_points)
|
||||
gstate.deferred_points.clear()
|
||||
|
||||
|
||||
def loop_body(opts, gstate):
|
||||
fields = {"status": {}, "ping_stats": {}, "usage": {}}
|
||||
|
||||
def cb_add_item(key, val, category):
|
||||
fields[category][key] = val
|
||||
|
||||
def cb_add_sequence(key, val, category, start):
|
||||
for i, subval in enumerate(val, start=start):
|
||||
fields[category]["{0}_{1}".format(key, i)] = subval
|
||||
|
||||
def cb_add_bulk(bulk, count, timestamp, counter):
|
||||
if gstate.start_timestamp is None:
|
||||
gstate.start_timestamp = timestamp
|
||||
gstate.start_counter = counter
|
||||
points = gstate.points if gstate.timebase_synced else gstate.deferred_points
|
||||
for i in range(count):
|
||||
timestamp += 1
|
||||
points.append({
|
||||
"measurement": BULK_MEASUREMENT,
|
||||
"tags": {
|
||||
"id": gstate.dish_id
|
||||
},
|
||||
"time": timestamp,
|
||||
"fields": {key: val[i] for key, val in bulk.items() if val[i] is not None},
|
||||
})
|
||||
if points:
|
||||
# save off counter value for script restart
|
||||
points[-1]["fields"]["counter"] = counter + count
|
||||
|
||||
#now = time.time()
|
||||
# work with UTC here
|
||||
now = time.mktime(datetime.utcnow().timetuple())
|
||||
rc = dish_common.get_data(opts, gstate, cb_add_item, cb_add_sequence, add_bulk=cb_add_bulk)
|
||||
if rc:
|
||||
return rc
|
||||
|
||||
for category in fields:
|
||||
if fields[category]:
|
||||
gstate.points.append({
|
||||
"measurement": "spacex.starlink.user_terminal." + category,
|
||||
"tags": {
|
||||
"id": gstate.dish_id
|
||||
},
|
||||
"time": int(now),
|
||||
"fields": fields[category],
|
||||
})
|
||||
|
||||
# This is here and not before the points being processed because if the
|
||||
# query previously failed, there will be points that were processed in
|
||||
# a prior loop. This avoids having to handle that as a special case.
|
||||
if opts.bulk_mode and not gstate.timebase_synced:
|
||||
sync_timebase(opts, gstate)
|
||||
|
||||
if opts.verbose:
|
||||
print("Data points queued: " + str(len(gstate.points)))
|
||||
|
||||
if len(gstate.points) >= FLUSH_LIMIT:
|
||||
return flush_points(opts, gstate)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def main():
|
||||
opts = parse_args()
|
||||
|
||||
logging.basicConfig(format="%(levelname)s: %(message)s")
|
||||
|
||||
gstate = dish_common.GlobalState(target=opts.target)
|
||||
gstate.points = []
|
||||
gstate.deferred_points = []
|
||||
gstate.timebase_synced = opts.skip_query
|
||||
gstate.start_timestamp = None
|
||||
gstate.start_counter = None
|
||||
|
||||
if "verify_ssl" in opts.icargs and not opts.icargs["verify_ssl"]:
|
||||
# user has explicitly said be insecure, so don't warn about it
|
||||
warnings.filterwarnings("ignore", message="Unverified HTTPS request")
|
||||
|
||||
signal.signal(signal.SIGTERM, handle_sigterm)
|
||||
try:
|
||||
gstate.influx_client = InfluxDBClient(**opts.icargs)
|
||||
except TypeError as _err:
|
||||
print('Error while creating influx client: ' + str(_err))
|
||||
|
||||
rc = 0
|
||||
try:
|
||||
next_loop = time.monotonic()
|
||||
while True:
|
||||
rc = loop_body(opts, gstate)
|
||||
if opts.loop_interval > 0.0:
|
||||
now = time.monotonic()
|
||||
next_loop = max(next_loop + opts.loop_interval, now)
|
||||
time.sleep(next_loop - now)
|
||||
else:
|
||||
break
|
||||
except Terminated:
|
||||
pass
|
||||
finally:
|
||||
if gstate.points:
|
||||
rc = flush_points(opts, gstate)
|
||||
gstate.influx_client.close()
|
||||
gstate.shutdown()
|
||||
|
||||
sys.exit(rc)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Loading…
Reference in a new issue