From 0225a9783c06df37811ef7903d59676ba7209d4b Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Sat, 19 Feb 2022 15:33:58 -0800 Subject: [PATCH] Various fixes to the InfluxDB 2.x script This should address all the comments I dropped onto pull request #37. The only significant fix here is that the SSL command line options should all work now, although slightly differently than how they work in the InfluxDB 1.x script. Whether or not SSL is enabled is controlled by the URL, specifically whether it is an https: URL or an http: one, and the options just control the CA cert parameters. --- dish_grpc_influx2.py | 71 ++++++++++++++++++-------------------------- 1 file changed, 29 insertions(+), 42 deletions(-) diff --git a/dish_grpc_influx2.py b/dish_grpc_influx2.py index 5c8c5e9..efd482f 100644 --- a/dish_grpc_influx2.py +++ b/dish_grpc_influx2.py @@ -71,19 +71,14 @@ def parse_args(): help="Skip querying for prior sample write point in bulk mode") group.add_argument("-C", "--ca-cert", - dest="verify_ssl", - help="Enable SSL/TLS using specified CA cert to verify broker", + dest="ssl_ca_cert", + help="Use specified CA cert to verify HTTPS server", metavar="FILENAME") group.add_argument("-I", "--insecure", action="store_false", dest="verify_ssl", - help="Enable SSL/TLS but disable certificate verification (INSECURE!)") - group.add_argument("-S", - "--secure", - action="store_true", - dest="verify_ssl", - help="Enable SSL/TLS using default CA cert") + help="Disable certificate verification of HTTPS server (INSECURE!)") env_map = ( ("INFLUXDB_URL", "url"), @@ -97,10 +92,13 @@ def parse_args(): # check both set and not empty string val = os.environ.get(var) if val: - if var == "INFLUXDB_SSL" and val == "secure": - env_defaults[opt] = True - elif var == "INFLUXDB_SSL" and val == "insecure": - env_defaults[opt] = False + if var == "INFLUXDB_SSL": + if val == "insecure": + env_defaults[opt] = False + elif val == "secure": + env_defaults[opt] = True + else: + env_defaults["ssl_ca_cert"] = val else: env_defaults[opt] = val parser.set_defaults(**env_defaults) @@ -108,13 +106,13 @@ def parse_args(): opts = dish_common.run_arg_parser(parser, need_id=True) opts.icargs = {} - for key in ["url", "token", "bucket", "org", "verify_ssl"]: + for key in ["url", "token", "bucket", "org", "verify_ssl", "ssl_ca_cert"]: val = getattr(opts, key) if val is not None: opts.icargs[key] = val - if opts.verify_ssl is not None: - opts.icargs["ssl"] = True + if (not opts.verify_ssl or opts.ssl_ca_cert is not None) and not opts.url.lower().startswith("https:"): + parser.error("SSL options only apply to HTTPS URLs") return opts @@ -155,27 +153,21 @@ def flush_points(opts, gstate): def query_counter(opts, gstate, start, end): - try: - query_api = gstate.influx_client.query_api() - print(str(opts.bucket) + ' ' + str(start) + " " + str(end)) - result = query_api.query(''' - from(bucket: "{0}") - |> range(start: {1}, stop: {2}) - |> filter(fn: (r) => r["_measurement"] == "{3}") - |> filter(fn: (r) => r["_field"] == "counter") - |> last() - |> yield(name: "last") - '''.format(opts.bucket, str(start), str(end), BULK_MEASUREMENT) - ) - if result: - counter = result[0].records[0]['_value'] - timestamp = result[0].records[0]['_time'] - timestamp = time.mktime(timestamp.timetuple()) - if counter and timestamp: - return int(counter), int(timestamp) - except TypeError as e: - logging.error( - "Skipping resumption from prior counter value. Reported error was: %s", str(e)) + query_api = gstate.influx_client.query_api() + result = query_api.query(''' + from(bucket: "{0}") + |> range(start: {1}, stop: {2}) + |> filter(fn: (r) => r["_measurement"] == "{3}") + |> filter(fn: (r) => r["_field"] == "counter") + |> last() + |> yield(name: "last") + '''.format(opts.bucket, str(start), str(end), BULK_MEASUREMENT) + ) + if result: + counter = result[0].records[0]['_value'] + timestamp = result[0].records[0]['_time'].timestamp() + if counter and timestamp: + return int(counter), int(timestamp) return None, 0 @@ -250,8 +242,6 @@ def loop_body(opts, gstate, shutdown=False): points[-1]["fields"]["counter"] = counter + count now = time.time() - # work with UTC here - # now = time.mktime(datetime.utcnow().timetuple()) rc = dish_common.get_data(opts, gstate, cb_add_item, @@ -304,10 +294,7 @@ def main(): warnings.filterwarnings("ignore", message="Unverified HTTPS request") signal.signal(signal.SIGTERM, handle_sigterm) - try: - gstate.influx_client = InfluxDBClient(**opts.icargs) - except TypeError as _err: - print('Error while creating influx client: ' + str(_err)) + gstate.influx_client = InfluxDBClient(**opts.icargs) rc = 0 try: