Various fixes to the InfluxDB 2.x script

This should address all the comments I dropped onto pull request #37.

The only significant fix here is that the SSL command line options should all work now, although slightly differently than how they work in the InfluxDB 1.x script. Whether or not SSL is enabled is controlled by the URL, specifically whether it is an https: URL or an http: one, and the options just control the CA cert parameters.
This commit is contained in:
sparky8512 2022-02-19 15:33:58 -08:00
parent 52dc5ec79d
commit 0225a9783c

View file

@ -71,19 +71,14 @@ def parse_args():
help="Skip querying for prior sample write point in bulk mode") help="Skip querying for prior sample write point in bulk mode")
group.add_argument("-C", group.add_argument("-C",
"--ca-cert", "--ca-cert",
dest="verify_ssl", dest="ssl_ca_cert",
help="Enable SSL/TLS using specified CA cert to verify broker", help="Use specified CA cert to verify HTTPS server",
metavar="FILENAME") metavar="FILENAME")
group.add_argument("-I", group.add_argument("-I",
"--insecure", "--insecure",
action="store_false", action="store_false",
dest="verify_ssl", dest="verify_ssl",
help="Enable SSL/TLS but disable certificate verification (INSECURE!)") help="Disable certificate verification of HTTPS server (INSECURE!)")
group.add_argument("-S",
"--secure",
action="store_true",
dest="verify_ssl",
help="Enable SSL/TLS using default CA cert")
env_map = ( env_map = (
("INFLUXDB_URL", "url"), ("INFLUXDB_URL", "url"),
@ -97,10 +92,13 @@ def parse_args():
# check both set and not empty string # check both set and not empty string
val = os.environ.get(var) val = os.environ.get(var)
if val: if val:
if var == "INFLUXDB_SSL" and val == "secure": if var == "INFLUXDB_SSL":
env_defaults[opt] = True if val == "insecure":
elif var == "INFLUXDB_SSL" and val == "insecure": env_defaults[opt] = False
env_defaults[opt] = False elif val == "secure":
env_defaults[opt] = True
else:
env_defaults["ssl_ca_cert"] = val
else: else:
env_defaults[opt] = val env_defaults[opt] = val
parser.set_defaults(**env_defaults) parser.set_defaults(**env_defaults)
@ -108,13 +106,13 @@ def parse_args():
opts = dish_common.run_arg_parser(parser, need_id=True) opts = dish_common.run_arg_parser(parser, need_id=True)
opts.icargs = {} opts.icargs = {}
for key in ["url", "token", "bucket", "org", "verify_ssl"]: for key in ["url", "token", "bucket", "org", "verify_ssl", "ssl_ca_cert"]:
val = getattr(opts, key) val = getattr(opts, key)
if val is not None: if val is not None:
opts.icargs[key] = val opts.icargs[key] = val
if opts.verify_ssl is not None: if (not opts.verify_ssl or opts.ssl_ca_cert is not None) and not opts.url.lower().startswith("https:"):
opts.icargs["ssl"] = True parser.error("SSL options only apply to HTTPS URLs")
return opts return opts
@ -155,27 +153,21 @@ def flush_points(opts, gstate):
def query_counter(opts, gstate, start, end): def query_counter(opts, gstate, start, end):
try: query_api = gstate.influx_client.query_api()
query_api = gstate.influx_client.query_api() result = query_api.query('''
print(str(opts.bucket) + ' ' + str(start) + " " + str(end)) from(bucket: "{0}")
result = query_api.query(''' |> range(start: {1}, stop: {2})
from(bucket: "{0}") |> filter(fn: (r) => r["_measurement"] == "{3}")
|> range(start: {1}, stop: {2}) |> filter(fn: (r) => r["_field"] == "counter")
|> filter(fn: (r) => r["_measurement"] == "{3}") |> last()
|> filter(fn: (r) => r["_field"] == "counter") |> yield(name: "last")
|> last() '''.format(opts.bucket, str(start), str(end), BULK_MEASUREMENT)
|> yield(name: "last") )
'''.format(opts.bucket, str(start), str(end), BULK_MEASUREMENT) if result:
) counter = result[0].records[0]['_value']
if result: timestamp = result[0].records[0]['_time'].timestamp()
counter = result[0].records[0]['_value'] if counter and timestamp:
timestamp = result[0].records[0]['_time'] return int(counter), int(timestamp)
timestamp = time.mktime(timestamp.timetuple())
if counter and timestamp:
return int(counter), int(timestamp)
except TypeError as e:
logging.error(
"Skipping resumption from prior counter value. Reported error was: %s", str(e))
return None, 0 return None, 0
@ -250,8 +242,6 @@ def loop_body(opts, gstate, shutdown=False):
points[-1]["fields"]["counter"] = counter + count points[-1]["fields"]["counter"] = counter + count
now = time.time() now = time.time()
# work with UTC here
# now = time.mktime(datetime.utcnow().timetuple())
rc = dish_common.get_data(opts, rc = dish_common.get_data(opts,
gstate, gstate,
cb_add_item, cb_add_item,
@ -304,10 +294,7 @@ def main():
warnings.filterwarnings("ignore", message="Unverified HTTPS request") warnings.filterwarnings("ignore", message="Unverified HTTPS request")
signal.signal(signal.SIGTERM, handle_sigterm) signal.signal(signal.SIGTERM, handle_sigterm)
try: gstate.influx_client = InfluxDBClient(**opts.icargs)
gstate.influx_client = InfluxDBClient(**opts.icargs)
except TypeError as _err:
print('Error while creating influx client: ' + str(_err))
rc = 0 rc = 0
try: try: