diff --git a/dish_grpc_influx2.py b/dish_grpc_influx2.py new file mode 100644 index 0000000..a9f09c4 --- /dev/null +++ b/dish_grpc_influx2.py @@ -0,0 +1,330 @@ +#!/usr/bin/python3 +"""Write Starlink user terminal data to an InfluxDB 2.x database. + +This script pulls the current status info and/or metrics computed from the +history data and writes them to the specified InfluxDB 2.x database either once +or in a periodic loop. + +Data will be written into the requested database with the following +measurement / series names: + +: spacex.starlink.user_terminal.status : Current status data +: spacex.starlink.user_terminal.history : Bulk history data +: spacex.starlink.user_terminal.ping_stats : Ping history statistics +: spacex.starlink.user_terminal.usage : Usage history statistics + +NOTE: The Starlink user terminal does not include time values with its +history or status data, so this script uses current system time to compute +the timestamps it sends to InfluxDB. It is recommended to run this script on +a host that has its system clock synced via NTP. Otherwise, the timestamps +may get out of sync with real time. +""" + +from datetime import datetime +from datetime import timezone +import logging +import os +import signal +import sys +import time +import warnings + +from influxdb_client import InfluxDBClient, WriteOptions, WritePrecision + +import dish_common + +HOST_URL = "localhost:8086" +BUCKET_DEFAULT = "starlinkstats" +BULK_MEASUREMENT = "spacex.starlink.user_terminal.history" +FLUSH_LIMIT = 6 +MAX_BATCH = 5000 +MAX_QUEUE_LENGTH = 864000 + + +class Terminated(Exception): + pass + + +def handle_sigterm(signum, frame): + # Turn SIGTERM into an exception so main loop can clean up + raise Terminated + + +def parse_args(): + parser = dish_common.create_arg_parser(output_description="write it to an InfluxDB 2.0 database") + + group = parser.add_argument_group(title="InfluxDB 2.0 database options") + group.add_argument("-u", + "--url", + default=HOST_URL, + dest="url", + help="URL or IP AND Port of the InfluxDB 2.0 Host, default: " + HOST_URL) + group.add_argument("-T", "--token", help="Token to access the bucket") + group.add_argument("-B", + "--bucket", + default=BUCKET_DEFAULT, + help="Bucket name to use, default: " + BUCKET_DEFAULT) + group.add_argument("-O", "--org", help="Organisation name") + group.add_argument("-k", + "--skip-query", + action="store_true", + help="Skip querying for prior sample write point in bulk mode") + group.add_argument("-C", + "--ca-cert", + dest="verify_ssl", + help="Enable SSL/TLS using specified CA cert to verify broker", + metavar="FILENAME") + group.add_argument("-I", + "--insecure", + action="store_false", + dest="verify_ssl", + help="Enable SSL/TLS but disable certificate verification (INSECURE!)") + group.add_argument("-S", + "--secure", + action="store_true", + dest="verify_ssl", + help="Enable SSL/TLS using default CA cert") + + env_map = ( + ("INFLUXDB_URL", "url"), + ("INFLUXDB_TOKEN", "token"), + ("INFLUXDB_Bucket", "bucket"), + ("INFLUXDB_ORG", "org"), + ("INFLUXDB_SSL", "verify_ssl"), + ) + env_defaults = {} + for var, opt in env_map: + # check both set and not empty string + val = os.environ.get(var) + if val: + if var == "INFLUXDB_SSL" and val == "secure": + env_defaults[opt] = True + elif var == "INFLUXDB_SSL" and val == "insecure": + env_defaults[opt] = False + else: + env_defaults[opt] = val + parser.set_defaults(**env_defaults) + + opts = dish_common.run_arg_parser(parser, need_id=True) + + opts.icargs = {} + for key in ["url", "token", "bucket", "org", "verify_ssl"]: + val = getattr(opts, key) + if val is not None: + opts.icargs[key] = val + + if opts.verify_ssl is not None: + opts.icargs["ssl"] = True + + return opts + + +def flush_points(opts, gstate): + try: + write_api = gstate.influx_client.write_api(write_options=WriteOptions(batch_size=len(gstate.points), + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2)) + while len(gstate.points) > MAX_BATCH: + write_api.write(record=gstate.points[:MAX_BATCH], write_precision=WritePrecision.S, bucket=opts.bucket) + if opts.verbose: + print("Data points written: " + str(MAX_BATCH)) + del gstate.points[:MAX_BATCH] + + if gstate.points: + write_api.write(record=gstate.points, write_precision=WritePrecision.S, bucket=opts.bucket) + if opts.verbose: + print("Data points written: " + str(len(gstate.points))) + gstate.points.clear() + write_api.flush() + write_api.close() + except Exception as e: + dish_common.conn_error(opts, "Failed writing to InfluxDB database: %s", str(e)) + # If failures persist, don't just use infinite memory. Max queue + # is currently 10 days of bulk data, so something is very wrong + # if it's ever exceeded. + if len(gstate.points) > MAX_QUEUE_LENGTH: + logging.error("Max write queue exceeded, discarding data.") + del gstate.points[:-MAX_QUEUE_LENGTH] + return 1 + + return 0 + + +def query_counter(opts, gstate, start, end): + try: + query_api = gstate.influx_client.query_api() + print(str(opts.bucket) + ' ' + str(start) + " " + str(end)) + result = query_api.query(''' + from(bucket: "{0}") + |> range(start: {1}, stop: {2}) + |> filter(fn: (r) => r["_measurement"] == "{3}") + |> filter(fn: (r) => r["_field"] == "counter") + |> last() + |> yield(name: "last") + '''.format(opts.bucket, str(start), str(end), BULK_MEASUREMENT) + ) + if result: + counter = result[0].records[0]['_value'] + timestamp = result[0].records[0]['_time'] + timestamp = time.mktime(timestamp.timetuple()) + if counter and timestamp: + return int(counter), int(timestamp) + except TypeError as e: + logging.error( + "Skipping resumption from prior counter value. Reported error was: %s", str(e)) + + return None, 0 + + +def sync_timebase(opts, gstate): + try: + db_counter, db_timestamp = query_counter(opts, gstate, gstate.start_timestamp, gstate.timestamp) + except Exception as e: + # could be temporary outage, so try again next time + dish_common.conn_error(opts, "Failed querying InfluxDB for prior count: %s", str(e)) + return + gstate.timebase_synced = True + + if db_counter and gstate.start_counter <= db_counter: + del gstate.deferred_points[:db_counter - gstate.start_counter] + if gstate.deferred_points: + delta_timestamp = db_timestamp - (gstate.deferred_points[0]["time"] - 1) + # to prevent +/- 1 second timestamp drift when the script restarts, + # if time base is within 2 seconds of that of the last sample in + # the database, correct back to that time base + if delta_timestamp == 0: + if opts.verbose: + print("Exactly synced with database time base") + elif -2 <= delta_timestamp <= 2: + if opts.verbose: + print("Replacing with existing time base: {0} -> {1}".format( + db_counter, datetime.fromtimestamp(db_timestamp, tz=timezone.utc))) + for point in gstate.deferred_points: + db_timestamp += 1 + if point["time"] + delta_timestamp == db_timestamp: + point["time"] = db_timestamp + else: + # lost time sync when recording data, leave the rest + break + else: + gstate.timestamp = db_timestamp + else: + if opts.verbose: + print("Database time base out of sync by {0} seconds".format(delta_timestamp)) + + gstate.points.extend(gstate.deferred_points) + gstate.deferred_points.clear() + + +def loop_body(opts, gstate): + fields = {"status": {}, "ping_stats": {}, "usage": {}} + + def cb_add_item(key, val, category): + fields[category][key] = val + + def cb_add_sequence(key, val, category, start): + for i, subval in enumerate(val, start=start): + fields[category]["{0}_{1}".format(key, i)] = subval + + def cb_add_bulk(bulk, count, timestamp, counter): + if gstate.start_timestamp is None: + gstate.start_timestamp = timestamp + gstate.start_counter = counter + points = gstate.points if gstate.timebase_synced else gstate.deferred_points + for i in range(count): + timestamp += 1 + points.append({ + "measurement": BULK_MEASUREMENT, + "tags": { + "id": gstate.dish_id + }, + "time": timestamp, + "fields": {key: val[i] for key, val in bulk.items() if val[i] is not None}, + }) + if points: + # save off counter value for script restart + points[-1]["fields"]["counter"] = counter + count + + #now = time.time() + # work with UTC here + now = time.mktime(datetime.utcnow().timetuple()) + rc = dish_common.get_data(opts, gstate, cb_add_item, cb_add_sequence, add_bulk=cb_add_bulk) + if rc: + return rc + + for category in fields: + if fields[category]: + gstate.points.append({ + "measurement": "spacex.starlink.user_terminal." + category, + "tags": { + "id": gstate.dish_id + }, + "time": int(now), + "fields": fields[category], + }) + + # This is here and not before the points being processed because if the + # query previously failed, there will be points that were processed in + # a prior loop. This avoids having to handle that as a special case. + if opts.bulk_mode and not gstate.timebase_synced: + sync_timebase(opts, gstate) + + if opts.verbose: + print("Data points queued: " + str(len(gstate.points))) + + if len(gstate.points) >= FLUSH_LIMIT: + return flush_points(opts, gstate) + + return 0 + + +def main(): + opts = parse_args() + + logging.basicConfig(format="%(levelname)s: %(message)s") + + gstate = dish_common.GlobalState(target=opts.target) + gstate.points = [] + gstate.deferred_points = [] + gstate.timebase_synced = opts.skip_query + gstate.start_timestamp = None + gstate.start_counter = None + + if "verify_ssl" in opts.icargs and not opts.icargs["verify_ssl"]: + # user has explicitly said be insecure, so don't warn about it + warnings.filterwarnings("ignore", message="Unverified HTTPS request") + + signal.signal(signal.SIGTERM, handle_sigterm) + try: + gstate.influx_client = InfluxDBClient(**opts.icargs) + except TypeError as _err: + print('Error while creating influx client: ' + str(_err)) + + rc = 0 + try: + next_loop = time.monotonic() + while True: + rc = loop_body(opts, gstate) + if opts.loop_interval > 0.0: + now = time.monotonic() + next_loop = max(next_loop + opts.loop_interval, now) + time.sleep(next_loop - now) + else: + break + except Terminated: + pass + finally: + if gstate.points: + rc = flush_points(opts, gstate) + gstate.influx_client.close() + gstate.shutdown() + + sys.exit(rc) + + +if __name__ == '__main__': + main()