Address a bunch of pylint and yapf complaints

I don't necessarily think all of these make the code better or easier to read, but it's easier to just go along with what the tools want, since they do generally make things better.

There should be no functional changes here.
This commit is contained in:
sparky8512 2022-09-14 12:55:50 -07:00
parent 24b8f95792
commit 5cc43f6e1d
12 changed files with 67 additions and 57 deletions

View file

@ -37,7 +37,8 @@ def main():
else: # unstow else: # unstow
request = request_class(dish_stow={"unstow": True}) request = request_class(dish_stow={"unstow": True})
stub = reflector.service_stub_class("SpaceX.API.Device.Device")(channel) stub = reflector.service_stub_class("SpaceX.API.Device.Device")(channel)
response = stub.Handle(request, timeout=10) stub.Handle(request, timeout=10)
# response is just empty message, so ignore it
except grpc.RpcError as e: except grpc.RpcError as e:
if isinstance(e, grpc.Call): if isinstance(e, grpc.Call):
msg = e.details() msg = e.details()
@ -49,5 +50,5 @@ def main():
sys.exit(0) sys.exit(0)
if __name__ == '__main__': if __name__ == "__main__":
main() main()

View file

@ -260,8 +260,8 @@ def loop_body(opts, gstate, shutdown=False):
if rc: if rc:
return rc return rc
for category in fields: for category, cat_fields in fields.items():
if fields[category]: if cat_fields:
timestamp = status_ts if category == "status" else hist_ts timestamp = status_ts if category == "status" else hist_ts
gstate.points.append({ gstate.points.append({
"measurement": "spacex.starlink.user_terminal." + category, "measurement": "spacex.starlink.user_terminal." + category,
@ -269,7 +269,7 @@ def loop_body(opts, gstate, shutdown=False):
"id": gstate.dish_id "id": gstate.dish_id
}, },
"time": timestamp, "time": timestamp,
"fields": fields[category], "fields": cat_fields,
}) })
# This is here and not before the points being processed because if the # This is here and not before the points being processed because if the
@ -334,5 +334,5 @@ def main():
sys.exit(rc) sys.exit(rc)
if __name__ == '__main__': if __name__ == "__main__":
main() main()

View file

@ -51,7 +51,8 @@ def handle_sigterm(signum, frame):
def parse_args(): def parse_args():
parser = dish_common.create_arg_parser(output_description="write it to an InfluxDB 2.x database") parser = dish_common.create_arg_parser(
output_description="write it to an InfluxDB 2.x database")
group = parser.add_argument_group(title="InfluxDB 2.x database options") group = parser.add_argument_group(title="InfluxDB 2.x database options")
group.add_argument("-u", group.add_argument("-u",
@ -111,7 +112,8 @@ def parse_args():
if val is not None: if val is not None:
opts.icargs[key] = val opts.icargs[key] = val
if (not opts.verify_ssl or opts.ssl_ca_cert is not None) and not opts.url.lower().startswith("https:"): if (not opts.verify_ssl
or opts.ssl_ca_cert is not None) and not opts.url.lower().startswith("https:"):
parser.error("SSL options only apply to HTTPS URLs") parser.error("SSL options only apply to HTTPS URLs")
return opts return opts
@ -119,7 +121,8 @@ def parse_args():
def flush_points(opts, gstate): def flush_points(opts, gstate):
try: try:
write_api = gstate.influx_client.write_api(write_options=WriteOptions(batch_size=len(gstate.points), write_api = gstate.influx_client.write_api(
write_options=WriteOptions(batch_size=len(gstate.points),
flush_interval=10_000, flush_interval=10_000,
jitter_interval=2_000, jitter_interval=2_000,
retry_interval=5_000, retry_interval=5_000,
@ -127,13 +130,17 @@ def flush_points(opts, gstate):
max_retry_delay=30_000, max_retry_delay=30_000,
exponential_base=2)) exponential_base=2))
while len(gstate.points) > MAX_BATCH: while len(gstate.points) > MAX_BATCH:
write_api.write(record=gstate.points[:MAX_BATCH], write_precision=WritePrecision.S, bucket=opts.bucket) write_api.write(record=gstate.points[:MAX_BATCH],
write_precision=WritePrecision.S,
bucket=opts.bucket)
if opts.verbose: if opts.verbose:
print("Data points written: " + str(MAX_BATCH)) print("Data points written: " + str(MAX_BATCH))
del gstate.points[:MAX_BATCH] del gstate.points[:MAX_BATCH]
if gstate.points: if gstate.points:
write_api.write(record=gstate.points, write_precision=WritePrecision.S, bucket=opts.bucket) write_api.write(record=gstate.points,
write_precision=WritePrecision.S,
bucket=opts.bucket)
if opts.verbose: if opts.verbose:
print("Data points written: " + str(len(gstate.points))) print("Data points written: " + str(len(gstate.points)))
gstate.points.clear() gstate.points.clear()
@ -161,11 +168,10 @@ def query_counter(opts, gstate, start, end):
|> filter(fn: (r) => r["_field"] == "counter") |> filter(fn: (r) => r["_field"] == "counter")
|> last() |> last()
|> yield(name: "last") |> yield(name: "last")
'''.format(opts.bucket, str(start), str(end), BULK_MEASUREMENT) '''.format(opts.bucket, str(start), str(end), BULK_MEASUREMENT))
)
if result: if result:
counter = result[0].records[0]['_value'] counter = result[0].records[0]["_value"]
timestamp = result[0].records[0]['_time'].timestamp() timestamp = result[0].records[0]["_time"].timestamp()
if counter and timestamp: if counter and timestamp:
return int(counter), int(timestamp) return int(counter), int(timestamp)
@ -174,7 +180,8 @@ def query_counter(opts, gstate, start, end):
def sync_timebase(opts, gstate): def sync_timebase(opts, gstate):
try: try:
db_counter, db_timestamp = query_counter(opts, gstate, gstate.start_timestamp, gstate.timestamp) db_counter, db_timestamp = query_counter(opts, gstate, gstate.start_timestamp,
gstate.timestamp)
except Exception as e: except Exception as e:
# could be temporary outage, so try again next time # could be temporary outage, so try again next time
dish_common.conn_error(opts, "Failed querying InfluxDB for prior count: %s", str(e)) dish_common.conn_error(opts, "Failed querying InfluxDB for prior count: %s", str(e))
@ -250,8 +257,8 @@ def loop_body(opts, gstate, shutdown=False):
if rc: if rc:
return rc return rc
for category in fields: for category, cat_fields in fields.items():
if fields[category]: if cat_fields:
timestamp = status_ts if category == "status" else hist_ts timestamp = status_ts if category == "status" else hist_ts
gstate.points.append({ gstate.points.append({
"measurement": "spacex.starlink.user_terminal." + category, "measurement": "spacex.starlink.user_terminal." + category,
@ -259,7 +266,7 @@ def loop_body(opts, gstate, shutdown=False):
"id": gstate.dish_id "id": gstate.dish_id
}, },
"time": timestamp, "time": timestamp,
"fields": fields[category], "fields": cat_fields,
}) })
# This is here and not before the points being processed because if the # This is here and not before the points being processed because if the
@ -319,5 +326,5 @@ def main():
sys.exit(rc) sys.exit(rc)
if __name__ == '__main__': if __name__ == "__main__":
main() main()

View file

@ -143,7 +143,7 @@ def loop_body(opts, gstate):
data["dish_{0}".format(category)] = {} data["dish_{0}".format(category)] = {}
# Skip NaN values that occur on startup because they can upset Javascript JSON parsers # Skip NaN values that occur on startup because they can upset Javascript JSON parsers
if not ((type(val) == float) and math.isnan(val)): if not (isinstance(val, float) and math.isnan(val)):
data["dish_{0}".format(category)].update({key: val}) data["dish_{0}".format(category)].update({key: val})
def cb_add_sequence(key, val, category, _): def cb_add_sequence(key, val, category, _):
@ -207,5 +207,5 @@ def main():
sys.exit(rc) sys.exit(rc)
if __name__ == '__main__': if __name__ == "__main__":
main() main()

View file

@ -270,7 +270,7 @@ def convert_tables(conn, context):
",".join(repeat("?", len(new_columns)))) ",".join(repeat("?", len(new_columns))))
new_cur.executemany(sql, (tuple(row[col] for col in new_columns) for row in old_cur)) new_cur.executemany(sql, (tuple(row[col] for col in new_columns) for row in old_cur))
new_cur.execute('DROP TABLE "{0}"'.format(table)) new_cur.execute('DROP TABLE "{0}"'.format(table))
new_cur.execute('ALTER TABLE {0}_new RENAME TO {0}'.format(table)) new_cur.execute('ALTER TABLE "{0}_new" RENAME TO "{0}"'.format(table))
old_cur.close() old_cur.close()
new_cur.close() new_cur.close()
conn.row_factory = None conn.row_factory = None
@ -315,5 +315,5 @@ def main():
sys.exit(rc) sys.exit(rc)
if __name__ == '__main__': if __name__ == "__main__":
main() main()

View file

@ -298,5 +298,5 @@ def main():
sys.exit(rc) sys.exit(rc)
if __name__ == '__main__': if __name__ == "__main__":
main() main()

View file

@ -281,5 +281,5 @@ def main():
sys.exit(rc) sys.exit(rc)
if __name__ == '__main__': if __name__ == "__main__":
main() main()

View file

@ -186,5 +186,5 @@ def main():
sys.exit(rc) sys.exit(rc)
if __name__ == '__main__': if __name__ == "__main__":
main() main()

View file

@ -8,10 +8,8 @@ import grpc
try: try:
from spacex.api.device import device_pb2 from spacex.api.device import device_pb2
from spacex.api.device import device_pb2_grpc from spacex.api.device import device_pb2_grpc
from spacex.api.device import dish_pb2
except ModuleNotFoundError: except ModuleNotFoundError:
print("This script requires the generated gRPC protocol modules. " print("This script requires the generated gRPC protocol modules. See README file for details.",
"See README file for details.",
file=sys.stderr) file=sys.stderr)
sys.exit(1) sys.exit(1)

View file

@ -103,9 +103,8 @@ def loop_body(opts):
print("Protoset:", filename) print("Protoset:", filename)
else: else:
try: try:
outfile = open(filename, mode="xb") with open(filename, mode="xb") as outfile:
outfile.write(protoset) outfile.write(protoset)
outfile.close()
print("New protoset found:", filename) print("New protoset found:", filename)
except FileExistsError: except FileExistsError:
if opts.verbose: if opts.verbose:
@ -139,5 +138,5 @@ def main():
break break
if __name__ == '__main__': if __name__ == "__main__":
main() main()

View file

@ -84,5 +84,5 @@ def main():
context.close() context.close()
if __name__ == '__main__': if __name__ == "__main__":
main() main()

View file

@ -613,7 +613,7 @@ def status_field_names(context: Optional[ChannelContext] = None):
try: try:
call_with_channel(resolve_imports, context=context) call_with_channel(resolve_imports, context=context)
except grpc.RpcError as e: except grpc.RpcError as e:
raise GrpcError(e) raise GrpcError(e) from e
alert_names = [] alert_names = []
for field in dish_pb2.DishAlerts.DESCRIPTOR.fields: for field in dish_pb2.DishAlerts.DESCRIPTOR.fields:
alert_names.append("alert_" + field.name) alert_names.append("alert_" + field.name)
@ -643,7 +643,7 @@ def status_field_types(context: Optional[ChannelContext] = None):
try: try:
call_with_channel(resolve_imports, context=context) call_with_channel(resolve_imports, context=context)
except grpc.RpcError as e: except grpc.RpcError as e:
raise GrpcError(e) raise GrpcError(e) from e
return (_field_types(StatusDict), _field_types(ObstructionDict), return (_field_types(StatusDict), _field_types(ObstructionDict),
[bool] * len(dish_pb2.DishAlerts.DESCRIPTOR.fields)) [bool] * len(dish_pb2.DishAlerts.DESCRIPTOR.fields))
@ -688,7 +688,7 @@ def get_id(context: Optional[ChannelContext] = None) -> str:
status = get_status(context) status = get_status(context)
return status.device_info.id return status.device_info.id
except grpc.RpcError as e: except grpc.RpcError as e:
raise GrpcError(e) raise GrpcError(e) from e
def status_data( def status_data(
@ -710,7 +710,7 @@ def status_data(
try: try:
status = get_status(context) status = get_status(context)
except grpc.RpcError as e: except grpc.RpcError as e:
raise GrpcError(e) raise GrpcError(e) from e
if status.HasField("outage"): if status.HasField("outage"):
if status.outage.cause == dish_pb2.DishOutage.Cause.NO_SCHEDULE: if status.outage.cause == dish_pb2.DishOutage.Cause.NO_SCHEDULE:
@ -732,8 +732,8 @@ def status_data(
if field.number < 65: if field.number < 65:
alert_bits |= (1 if value else 0) << (field.number - 1) alert_bits |= (1 if value else 0) << (field.number - 1)
if (status.obstruction_stats.avg_prolonged_obstruction_duration_s > 0.0 and not if (status.obstruction_stats.avg_prolonged_obstruction_duration_s > 0.0
math.isnan(status.obstruction_stats.avg_prolonged_obstruction_interval_s)): and not math.isnan(status.obstruction_stats.avg_prolonged_obstruction_interval_s)):
obstruction_duration = status.obstruction_stats.avg_prolonged_obstruction_duration_s obstruction_duration = status.obstruction_stats.avg_prolonged_obstruction_duration_s
obstruction_interval = status.obstruction_stats.avg_prolonged_obstruction_interval_s obstruction_interval = status.obstruction_stats.avg_prolonged_obstruction_interval_s
else: else:
@ -834,7 +834,7 @@ def location_data(context: Optional[ChannelContext] = None) -> LocationDict:
"longitude": None, "longitude": None,
"altitude": None, "altitude": None,
} }
raise GrpcError(e) raise GrpcError(e) from e
return { return {
"latitude": location.lla.lat, "latitude": location.lla.lat,
@ -936,7 +936,10 @@ def get_history(context: Optional[ChannelContext] = None):
return call_with_channel(grpc_call, context=context) return call_with_channel(grpc_call, context=context)
def _compute_sample_range(history, parse_samples: int, start: Optional[int] = None, verbose: bool = False): def _compute_sample_range(history,
parse_samples: int,
start: Optional[int] = None,
verbose: bool = False):
current = int(history.current) current = int(history.current)
samples = len(history.pop_ping_drop_rate) samples = len(history.pop_ping_drop_rate)
@ -984,7 +987,11 @@ def _compute_sample_range(history, parse_samples: int, start: Optional[int] = No
return sample_range, current - start, current return sample_range, current - start, current
def concatenate_history(history1, history2, samples1: int = -1, start1: Optional[int] = None, verbose: bool = False): def concatenate_history(history1,
history2,
samples1: int = -1,
start1: Optional[int] = None,
verbose: bool = False):
"""Append the sample-dependent fields of one history object to another. """Append the sample-dependent fields of one history object to another.
Note: Note:
@ -1088,7 +1095,7 @@ def history_bulk_data(parse_samples: int,
try: try:
history = get_history(context) history = get_history(context)
except grpc.RpcError as e: except grpc.RpcError as e:
raise GrpcError(e) raise GrpcError(e) from e
sample_range, parsed_samples, current = _compute_sample_range(history, sample_range, parsed_samples, current = _compute_sample_range(history,
parse_samples, parse_samples,
@ -1173,7 +1180,7 @@ def history_stats(
try: try:
history = get_history(context) history = get_history(context)
except grpc.RpcError as e: except grpc.RpcError as e:
raise GrpcError(e) raise GrpcError(e) from e
sample_range, parsed_samples, current = _compute_sample_range(history, sample_range, parsed_samples, current = _compute_sample_range(history,
parse_samples, parse_samples,
@ -1364,7 +1371,7 @@ def obstruction_map(context: Optional[ChannelContext] = None):
try: try:
map_data = get_obstruction_map(context) map_data = get_obstruction_map(context)
except grpc.RpcError as e: except grpc.RpcError as e:
raise GrpcError(e) raise GrpcError(e) from e
cols = map_data.num_cols cols = map_data.num_cols
return tuple((map_data.snr[i:i + cols]) for i in range(0, cols * map_data.num_rows, cols)) return tuple((map_data.snr[i:i + cols]) for i in range(0, cols * map_data.num_rows, cols))
@ -1388,12 +1395,11 @@ def reboot(context: Optional[ChannelContext] = None) -> None:
stub = device_pb2_grpc.DeviceStub(channel) stub = device_pb2_grpc.DeviceStub(channel)
stub.Handle(device_pb2.Request(reboot={}), timeout=REQUEST_TIMEOUT) stub.Handle(device_pb2.Request(reboot={}), timeout=REQUEST_TIMEOUT)
# response is empty message in this case, so just ignore it # response is empty message in this case, so just ignore it
return None
try: try:
call_with_channel(grpc_call, context=context) call_with_channel(grpc_call, context=context)
except grpc.RpcError as e: except grpc.RpcError as e:
raise GrpcError(e) raise GrpcError(e) from e
def set_stow_state(unstow: bool = False, context: Optional[ChannelContext] = None) -> None: def set_stow_state(unstow: bool = False, context: Optional[ChannelContext] = None) -> None:
@ -1416,9 +1422,8 @@ def set_stow_state(unstow: bool = False, context: Optional[ChannelContext] = Non
stub = device_pb2_grpc.DeviceStub(channel) stub = device_pb2_grpc.DeviceStub(channel)
stub.Handle(device_pb2.Request(dish_stow={"unstow": unstow}), timeout=REQUEST_TIMEOUT) stub.Handle(device_pb2.Request(dish_stow={"unstow": unstow}), timeout=REQUEST_TIMEOUT)
# response is empty message in this case, so just ignore it # response is empty message in this case, so just ignore it
return None
try: try:
call_with_channel(grpc_call, context=context) call_with_channel(grpc_call, context=context)
except grpc.RpcError as e: except grpc.RpcError as e:
raise GrpcError(e) raise GrpcError(e) from e