diff --git a/dish_grpc_influx2.py b/dish_grpc_influx2.py index 5c8c5e9..efd482f 100644 --- a/dish_grpc_influx2.py +++ b/dish_grpc_influx2.py @@ -71,19 +71,14 @@ def parse_args(): help="Skip querying for prior sample write point in bulk mode") group.add_argument("-C", "--ca-cert", - dest="verify_ssl", - help="Enable SSL/TLS using specified CA cert to verify broker", + dest="ssl_ca_cert", + help="Use specified CA cert to verify HTTPS server", metavar="FILENAME") group.add_argument("-I", "--insecure", action="store_false", dest="verify_ssl", - help="Enable SSL/TLS but disable certificate verification (INSECURE!)") - group.add_argument("-S", - "--secure", - action="store_true", - dest="verify_ssl", - help="Enable SSL/TLS using default CA cert") + help="Disable certificate verification of HTTPS server (INSECURE!)") env_map = ( ("INFLUXDB_URL", "url"), @@ -97,10 +92,13 @@ def parse_args(): # check both set and not empty string val = os.environ.get(var) if val: - if var == "INFLUXDB_SSL" and val == "secure": - env_defaults[opt] = True - elif var == "INFLUXDB_SSL" and val == "insecure": - env_defaults[opt] = False + if var == "INFLUXDB_SSL": + if val == "insecure": + env_defaults[opt] = False + elif val == "secure": + env_defaults[opt] = True + else: + env_defaults["ssl_ca_cert"] = val else: env_defaults[opt] = val parser.set_defaults(**env_defaults) @@ -108,13 +106,13 @@ def parse_args(): opts = dish_common.run_arg_parser(parser, need_id=True) opts.icargs = {} - for key in ["url", "token", "bucket", "org", "verify_ssl"]: + for key in ["url", "token", "bucket", "org", "verify_ssl", "ssl_ca_cert"]: val = getattr(opts, key) if val is not None: opts.icargs[key] = val - if opts.verify_ssl is not None: - opts.icargs["ssl"] = True + if (not opts.verify_ssl or opts.ssl_ca_cert is not None) and not opts.url.lower().startswith("https:"): + parser.error("SSL options only apply to HTTPS URLs") return opts @@ -155,27 +153,21 @@ def flush_points(opts, gstate): def query_counter(opts, gstate, start, end): - try: - query_api = gstate.influx_client.query_api() - print(str(opts.bucket) + ' ' + str(start) + " " + str(end)) - result = query_api.query(''' - from(bucket: "{0}") - |> range(start: {1}, stop: {2}) - |> filter(fn: (r) => r["_measurement"] == "{3}") - |> filter(fn: (r) => r["_field"] == "counter") - |> last() - |> yield(name: "last") - '''.format(opts.bucket, str(start), str(end), BULK_MEASUREMENT) - ) - if result: - counter = result[0].records[0]['_value'] - timestamp = result[0].records[0]['_time'] - timestamp = time.mktime(timestamp.timetuple()) - if counter and timestamp: - return int(counter), int(timestamp) - except TypeError as e: - logging.error( - "Skipping resumption from prior counter value. Reported error was: %s", str(e)) + query_api = gstate.influx_client.query_api() + result = query_api.query(''' + from(bucket: "{0}") + |> range(start: {1}, stop: {2}) + |> filter(fn: (r) => r["_measurement"] == "{3}") + |> filter(fn: (r) => r["_field"] == "counter") + |> last() + |> yield(name: "last") + '''.format(opts.bucket, str(start), str(end), BULK_MEASUREMENT) + ) + if result: + counter = result[0].records[0]['_value'] + timestamp = result[0].records[0]['_time'].timestamp() + if counter and timestamp: + return int(counter), int(timestamp) return None, 0 @@ -250,8 +242,6 @@ def loop_body(opts, gstate, shutdown=False): points[-1]["fields"]["counter"] = counter + count now = time.time() - # work with UTC here - # now = time.mktime(datetime.utcnow().timetuple()) rc = dish_common.get_data(opts, gstate, cb_add_item, @@ -304,10 +294,7 @@ def main(): warnings.filterwarnings("ignore", message="Unverified HTTPS request") signal.signal(signal.SIGTERM, handle_sigterm) - try: - gstate.influx_client = InfluxDBClient(**opts.icargs) - except TypeError as _err: - print('Error while creating influx client: ' + str(_err)) + gstate.influx_client = InfluxDBClient(**opts.icargs) rc = 0 try: