Support for overriding dish IP and port

Probably not terribly useful unless someone needs to tunnel through a different network to get to their dish, but it makes testing the dish unreachable case a lot easier. This was complicated a bit by the fact that a channel (and therefor the dish IP and port) is needed to get the list of alert types via reflection due to prior changes.

This exposed some issues with the error message for dish unreachable, so fixed those.
This commit is contained in:
sparky8512 2021-02-15 18:50:22 -08:00
parent 1659133168
commit a4bf2d1625
6 changed files with 122 additions and 86 deletions

View file

@ -43,6 +43,10 @@ def create_arg_parser(output_description, bulk_history=True):
parser.bulk_history = bulk_history
group = parser.add_argument_group(title="General options")
group.add_argument("-g",
"--target",
help="host:port of dish to query, default is the standard IP address "
"and port (192.168.100.1:9200)")
group.add_argument("-h", "--help", action="help", help="Be helpful")
group.add_argument("-t",
"--loop-interval",
@ -129,14 +133,14 @@ def conn_error(opts, msg, *args):
class GlobalState:
"""A class for keeping state across loop iterations."""
def __init__(self):
def __init__(self, target=None):
# counter for bulk_history:
self.counter = None
# counter for history stats:
self.counter_stats = None
self.timestamp = None
self.dish_id = None
self.context = starlink_grpc.ChannelContext()
self.context = starlink_grpc.ChannelContext(target=target)
def shutdown(self):
self.context.close()
@ -183,12 +187,12 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None):
if "status" in opts.mode:
if opts.need_id and gstate.dish_id is None:
conn_error(opts, "Dish unreachable and ID unknown, so not recording state")
else:
if opts.verbose:
print("Dish unreachable")
if "status" in opts.mode:
add_item("state", "DISH_UNREACHABLE", "status")
return 0
return 1
if opts.verbose:
print("Dish unreachable")
add_item("state", "DISH_UNREACHABLE", "status")
return 0
conn_error(opts, "Failure getting status: %s", str(e))
return 1
if opts.need_id:
gstate.dish_id = status_data["id"]

View file

@ -278,7 +278,7 @@ def main():
logging.basicConfig(format="%(levelname)s: %(message)s")
gstate = dish_common.GlobalState()
gstate = dish_common.GlobalState(target=opts.target)
gstate.points = []
gstate.deferred_points = []
gstate.timebase_synced = opts.skip_query

View file

@ -111,7 +111,7 @@ def main():
logging.basicConfig(format="%(levelname)s: %(message)s")
gstate = dish_common.GlobalState()
gstate = dish_common.GlobalState(target=opts.target)
try:
next_loop = time.monotonic()

View file

@ -148,34 +148,39 @@ def loop_body(opts, gstate):
return rc
def ensure_schema(opts, conn):
def ensure_schema(opts, conn, context):
cur = conn.cursor()
cur.execute("PRAGMA user_version")
version = cur.fetchone()
if version and version[0] == SCHEMA_VERSION:
cur.close()
return
return 0
if not version or not version[0]:
if opts.verbose:
print("Initializing new database")
create_tables(conn, "")
elif version[0] > SCHEMA_VERSION and not opts.force:
logging.error("Cowardly refusing to downgrade from schema version %s", version[0])
raise Terminated
else:
print("Converting from schema version:", version[0])
convert_tables(conn)
cur.execute("PRAGMA user_version={0}".format(SCHEMA_VERSION))
cur.close()
conn.commit()
try:
if not version or not version[0]:
if opts.verbose:
print("Initializing new database")
create_tables(conn, context, "")
elif version[0] > SCHEMA_VERSION and not opts.force:
logging.error("Cowardly refusing to downgrade from schema version %s", version[0])
return 1
else:
print("Converting from schema version:", version[0])
convert_tables(conn, context)
cur.execute("PRAGMA user_version={0}".format(SCHEMA_VERSION))
conn.commit()
return 0
except starlink_grpc.GrpcError as e:
dish_common.conn_error(opts, "Failure reflecting status fields: %s", str(e))
return 1
finally:
cur.close()
def create_tables(conn, suffix):
def create_tables(conn, context, suffix):
tables = {}
name_groups = starlink_grpc.status_field_names()
type_groups = starlink_grpc.status_field_types()
name_groups = starlink_grpc.status_field_names(context=context)
type_groups = starlink_grpc.status_field_types(context=context)
tables["status"] = zip(name_groups, type_groups)
name_groups = starlink_grpc.history_stats_field_names()
@ -220,8 +225,8 @@ def create_tables(conn, suffix):
return column_info
def convert_tables(conn):
new_column_info = create_tables(conn, "_new")
def convert_tables(conn, context):
new_column_info = create_tables(conn, context, "_new")
conn.row_factory = sqlite3.Row
old_cur = conn.cursor()
new_cur = conn.cursor()
@ -245,7 +250,7 @@ def main():
logging.basicConfig(format="%(levelname)s: %(message)s")
gstate = dish_common.GlobalState()
gstate = dish_common.GlobalState(target=opts.target)
gstate.points = []
gstate.deferred_points = []
@ -254,7 +259,9 @@ def main():
rc = 0
try:
ensure_schema(opts, gstate.sql_conn)
rc = ensure_schema(opts, gstate.sql_conn, gstate.context)
if rc:
sys.exit(rc)
next_loop = time.monotonic()
while True:
rc = loop_body(opts, gstate)

View file

@ -85,13 +85,18 @@ def print_header(opts):
header.append(name)
if opts.satus_mode:
status_names, obstruct_names, alert_names = starlink_grpc.status_field_names()
context = starlink_grpc.ChannelContext(target=opts.target)
try:
name_groups = starlink_grpc.status_field_names(context=context)
except starlink_grpc.GrpcError as e:
dish_common.conn_error(opts, "Failure reflecting status field names: %s", str(e))
return 1
if "status" in opts.mode:
header_add(status_names)
header_add(name_groups[0])
if "obstruction_detail" in opts.mode:
header_add(obstruct_names)
header_add(name_groups[1])
if "alert_detail" in opts.mode:
header_add(alert_names)
header_add(name_groups[2])
if opts.bulk_mode:
general, bulk = starlink_grpc.history_bulk_field_names()
@ -114,6 +119,7 @@ def print_header(opts):
header_add(usage)
print(",".join(header))
return 0
def loop_body(opts, gstate):
@ -180,10 +186,10 @@ def main():
logging.basicConfig(format="%(levelname)s: %(message)s")
if opts.print_header:
print_header(opts)
sys.exit(0)
rc = print_header(opts)
sys.exit(rc)
gstate = dish_common.GlobalState()
gstate = dish_common.GlobalState(target=opts.target)
try:
next_loop = time.monotonic()

View file

@ -345,9 +345,9 @@ class ChannelContext:
`close()` should be called on the object when it is no longer
in use.
"""
def __init__(self, target="192.168.100.1:9200"):
def __init__(self, target=None):
self.channel = None
self.target = target
self.target = "192.168.100.1:9200" if target is None else target
def get_channel(self):
reused = True
@ -362,19 +362,53 @@ class ChannelContext:
self.channel = None
def status_field_names():
def call_with_channel(function, *args, context=None, **kwargs):
"""Call a function with a channel object.
Args:
function: Function to call with channel as first arg.
args: Additional args to pass to function
context (ChannelContext): Optionally provide a channel for (re)use.
If not set, a new default channel will be used and then closed.
kwargs: Additional keyword args to pass to function.
"""
if context is None:
with grpc.insecure_channel("192.168.100.1:9200") as channel:
return function(channel, *args, **kwargs)
while True:
channel, reused = context.get_channel()
try:
return function(channel, *args, **kwargs)
except grpc.RpcError:
context.close()
if not reused:
raise
def status_field_names(context=None):
"""Return the field names of the status data.
Note:
See module level docs regarding brackets in field names.
Args:
context (ChannelContext): Optionally provide a channel for (re)use
with reflection service.
Returns:
A tuple with 3 lists, with status data field names, alert detail field
names, and obstruction detail field names, in that order.
Raises:
GrpcError: No user terminal is currently available to resolve imports
via reflection.
"""
if imports_pending:
with grpc.insecure_channel("192.168.100.1:9200") as channel:
resolve_imports(channel)
try:
call_with_channel(resolve_imports, context=context)
except grpc.RpcError as e:
raise GrpcError(e)
alert_names = []
for field in dish_pb2.DishAlerts.DESCRIPTOR.fields:
alert_names.append("alert_" + field.name)
@ -402,19 +436,29 @@ def status_field_names():
], alert_names
def status_field_types():
def status_field_types(context=None):
"""Return the field types of the status data.
Return the type classes for each field. For sequence types, the type of
element in the sequence is returned, not the type of the sequence.
Args:
context (ChannelContext): Optionally provide a channel for (re)use
with reflection service.
Returns:
A tuple with 3 lists, with status data field types, alert detail field
types, and obstruction detail field types, in that order.
Raises:
GrpcError: No user terminal is currently available to resolve imports
via reflection.
"""
if imports_pending:
with grpc.insecure_channel("192.168.100.1:9200") as channel:
resolve_imports(channel)
try:
call_with_channel(resolve_imports, context=context)
except grpc.RpcError as e:
raise GrpcError(e)
return [
str, # id
str, # hardware_version
@ -450,26 +494,14 @@ def get_status(context=None):
Raises:
grpc.RpcError: Communication or service error.
"""
if context is None:
with grpc.insecure_channel("192.168.100.1:9200") as channel:
if imports_pending:
resolve_imports(channel)
stub = device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(device_pb2.Request(get_status={}))
def grpc_call(channel):
if imports_pending:
resolve_imports(channel)
stub = device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(device_pb2.Request(get_status={}))
return response.dish_get_status
while True:
channel, reused = context.get_channel()
try:
if imports_pending:
resolve_imports(channel)
stub = device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(device_pb2.Request(get_status={}))
return response.dish_get_status
except grpc.RpcError:
context.close()
if not reused:
raise
return call_with_channel(grpc_call, context=context)
def get_id(context=None):
@ -506,8 +538,7 @@ def status_data(context=None):
values, in that order.
Raises:
GrpcError: Failed getting history info from the Starlink user
terminal.
GrpcError: Failed getting status info from the Starlink user terminal.
"""
try:
status = get_status(context)
@ -712,26 +743,14 @@ def get_history(context=None):
Raises:
grpc.RpcError: Communication or service error.
"""
if context is None:
with grpc.insecure_channel("192.168.100.1:9200") as channel:
if imports_pending:
resolve_imports(channel)
stub = device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(device_pb2.Request(get_history={}))
def grpc_call(channel):
if imports_pending:
resolve_imports(channel)
stub = device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(device_pb2.Request(get_history={}))
return response.dish_get_history
while True:
channel, reused = context.get_channel()
try:
if imports_pending:
resolve_imports(channel)
stub = device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(device_pb2.Request(get_history={}))
return response.dish_get_history
except grpc.RpcError:
context.close()
if not reused:
raise
return call_with_channel(grpc_call, context=context)
def _compute_sample_range(history, parse_samples, start=None, verbose=False):