diff --git a/dish_common.py b/dish_common.py index 5063f28..3990265 100644 --- a/dish_common.py +++ b/dish_common.py @@ -43,6 +43,10 @@ def create_arg_parser(output_description, bulk_history=True): parser.bulk_history = bulk_history group = parser.add_argument_group(title="General options") + group.add_argument("-g", + "--target", + help="host:port of dish to query, default is the standard IP address " + "and port (192.168.100.1:9200)") group.add_argument("-h", "--help", action="help", help="Be helpful") group.add_argument("-t", "--loop-interval", @@ -129,14 +133,14 @@ def conn_error(opts, msg, *args): class GlobalState: """A class for keeping state across loop iterations.""" - def __init__(self): + def __init__(self, target=None): # counter for bulk_history: self.counter = None # counter for history stats: self.counter_stats = None self.timestamp = None self.dish_id = None - self.context = starlink_grpc.ChannelContext() + self.context = starlink_grpc.ChannelContext(target=target) def shutdown(self): self.context.close() @@ -183,12 +187,12 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None): if "status" in opts.mode: if opts.need_id and gstate.dish_id is None: conn_error(opts, "Dish unreachable and ID unknown, so not recording state") - else: - if opts.verbose: - print("Dish unreachable") - if "status" in opts.mode: - add_item("state", "DISH_UNREACHABLE", "status") - return 0 + return 1 + if opts.verbose: + print("Dish unreachable") + add_item("state", "DISH_UNREACHABLE", "status") + return 0 + conn_error(opts, "Failure getting status: %s", str(e)) return 1 if opts.need_id: gstate.dish_id = status_data["id"] diff --git a/dish_grpc_influx.py b/dish_grpc_influx.py index 68d26db..de46d50 100644 --- a/dish_grpc_influx.py +++ b/dish_grpc_influx.py @@ -278,7 +278,7 @@ def main(): logging.basicConfig(format="%(levelname)s: %(message)s") - gstate = dish_common.GlobalState() + gstate = dish_common.GlobalState(target=opts.target) gstate.points = [] gstate.deferred_points = [] gstate.timebase_synced = opts.skip_query diff --git a/dish_grpc_mqtt.py b/dish_grpc_mqtt.py index 6448d6c..24563d7 100644 --- a/dish_grpc_mqtt.py +++ b/dish_grpc_mqtt.py @@ -111,7 +111,7 @@ def main(): logging.basicConfig(format="%(levelname)s: %(message)s") - gstate = dish_common.GlobalState() + gstate = dish_common.GlobalState(target=opts.target) try: next_loop = time.monotonic() diff --git a/dish_grpc_sqlite.py b/dish_grpc_sqlite.py index fc7781d..9b4e700 100644 --- a/dish_grpc_sqlite.py +++ b/dish_grpc_sqlite.py @@ -148,34 +148,39 @@ def loop_body(opts, gstate): return rc -def ensure_schema(opts, conn): +def ensure_schema(opts, conn, context): cur = conn.cursor() cur.execute("PRAGMA user_version") version = cur.fetchone() if version and version[0] == SCHEMA_VERSION: cur.close() - return + return 0 - if not version or not version[0]: - if opts.verbose: - print("Initializing new database") - create_tables(conn, "") - elif version[0] > SCHEMA_VERSION and not opts.force: - logging.error("Cowardly refusing to downgrade from schema version %s", version[0]) - raise Terminated - else: - print("Converting from schema version:", version[0]) - convert_tables(conn) - - cur.execute("PRAGMA user_version={0}".format(SCHEMA_VERSION)) - cur.close() - conn.commit() + try: + if not version or not version[0]: + if opts.verbose: + print("Initializing new database") + create_tables(conn, context, "") + elif version[0] > SCHEMA_VERSION and not opts.force: + logging.error("Cowardly refusing to downgrade from schema version %s", version[0]) + return 1 + else: + print("Converting from schema version:", version[0]) + convert_tables(conn, context) + cur.execute("PRAGMA user_version={0}".format(SCHEMA_VERSION)) + conn.commit() + return 0 + except starlink_grpc.GrpcError as e: + dish_common.conn_error(opts, "Failure reflecting status fields: %s", str(e)) + return 1 + finally: + cur.close() -def create_tables(conn, suffix): +def create_tables(conn, context, suffix): tables = {} - name_groups = starlink_grpc.status_field_names() - type_groups = starlink_grpc.status_field_types() + name_groups = starlink_grpc.status_field_names(context=context) + type_groups = starlink_grpc.status_field_types(context=context) tables["status"] = zip(name_groups, type_groups) name_groups = starlink_grpc.history_stats_field_names() @@ -220,8 +225,8 @@ def create_tables(conn, suffix): return column_info -def convert_tables(conn): - new_column_info = create_tables(conn, "_new") +def convert_tables(conn, context): + new_column_info = create_tables(conn, context, "_new") conn.row_factory = sqlite3.Row old_cur = conn.cursor() new_cur = conn.cursor() @@ -245,7 +250,7 @@ def main(): logging.basicConfig(format="%(levelname)s: %(message)s") - gstate = dish_common.GlobalState() + gstate = dish_common.GlobalState(target=opts.target) gstate.points = [] gstate.deferred_points = [] @@ -254,7 +259,9 @@ def main(): rc = 0 try: - ensure_schema(opts, gstate.sql_conn) + rc = ensure_schema(opts, gstate.sql_conn, gstate.context) + if rc: + sys.exit(rc) next_loop = time.monotonic() while True: rc = loop_body(opts, gstate) diff --git a/dish_grpc_text.py b/dish_grpc_text.py index 333c69d..849114d 100644 --- a/dish_grpc_text.py +++ b/dish_grpc_text.py @@ -85,13 +85,18 @@ def print_header(opts): header.append(name) if opts.satus_mode: - status_names, obstruct_names, alert_names = starlink_grpc.status_field_names() + context = starlink_grpc.ChannelContext(target=opts.target) + try: + name_groups = starlink_grpc.status_field_names(context=context) + except starlink_grpc.GrpcError as e: + dish_common.conn_error(opts, "Failure reflecting status field names: %s", str(e)) + return 1 if "status" in opts.mode: - header_add(status_names) + header_add(name_groups[0]) if "obstruction_detail" in opts.mode: - header_add(obstruct_names) + header_add(name_groups[1]) if "alert_detail" in opts.mode: - header_add(alert_names) + header_add(name_groups[2]) if opts.bulk_mode: general, bulk = starlink_grpc.history_bulk_field_names() @@ -114,6 +119,7 @@ def print_header(opts): header_add(usage) print(",".join(header)) + return 0 def loop_body(opts, gstate): @@ -180,10 +186,10 @@ def main(): logging.basicConfig(format="%(levelname)s: %(message)s") if opts.print_header: - print_header(opts) - sys.exit(0) + rc = print_header(opts) + sys.exit(rc) - gstate = dish_common.GlobalState() + gstate = dish_common.GlobalState(target=opts.target) try: next_loop = time.monotonic() diff --git a/starlink_grpc.py b/starlink_grpc.py index 98c2d3c..cb56621 100644 --- a/starlink_grpc.py +++ b/starlink_grpc.py @@ -345,9 +345,9 @@ class ChannelContext: `close()` should be called on the object when it is no longer in use. """ - def __init__(self, target="192.168.100.1:9200"): + def __init__(self, target=None): self.channel = None - self.target = target + self.target = "192.168.100.1:9200" if target is None else target def get_channel(self): reused = True @@ -362,19 +362,53 @@ class ChannelContext: self.channel = None -def status_field_names(): +def call_with_channel(function, *args, context=None, **kwargs): + """Call a function with a channel object. + + Args: + function: Function to call with channel as first arg. + args: Additional args to pass to function + context (ChannelContext): Optionally provide a channel for (re)use. + If not set, a new default channel will be used and then closed. + kwargs: Additional keyword args to pass to function. + """ + if context is None: + with grpc.insecure_channel("192.168.100.1:9200") as channel: + return function(channel, *args, **kwargs) + + while True: + channel, reused = context.get_channel() + try: + return function(channel, *args, **kwargs) + except grpc.RpcError: + context.close() + if not reused: + raise + + +def status_field_names(context=None): """Return the field names of the status data. Note: See module level docs regarding brackets in field names. + Args: + context (ChannelContext): Optionally provide a channel for (re)use + with reflection service. + Returns: A tuple with 3 lists, with status data field names, alert detail field names, and obstruction detail field names, in that order. + + Raises: + GrpcError: No user terminal is currently available to resolve imports + via reflection. """ if imports_pending: - with grpc.insecure_channel("192.168.100.1:9200") as channel: - resolve_imports(channel) + try: + call_with_channel(resolve_imports, context=context) + except grpc.RpcError as e: + raise GrpcError(e) alert_names = [] for field in dish_pb2.DishAlerts.DESCRIPTOR.fields: alert_names.append("alert_" + field.name) @@ -402,19 +436,29 @@ def status_field_names(): ], alert_names -def status_field_types(): +def status_field_types(context=None): """Return the field types of the status data. Return the type classes for each field. For sequence types, the type of element in the sequence is returned, not the type of the sequence. + Args: + context (ChannelContext): Optionally provide a channel for (re)use + with reflection service. + Returns: A tuple with 3 lists, with status data field types, alert detail field types, and obstruction detail field types, in that order. + + Raises: + GrpcError: No user terminal is currently available to resolve imports + via reflection. """ if imports_pending: - with grpc.insecure_channel("192.168.100.1:9200") as channel: - resolve_imports(channel) + try: + call_with_channel(resolve_imports, context=context) + except grpc.RpcError as e: + raise GrpcError(e) return [ str, # id str, # hardware_version @@ -450,26 +494,14 @@ def get_status(context=None): Raises: grpc.RpcError: Communication or service error. """ - if context is None: - with grpc.insecure_channel("192.168.100.1:9200") as channel: - if imports_pending: - resolve_imports(channel) - stub = device_pb2_grpc.DeviceStub(channel) - response = stub.Handle(device_pb2.Request(get_status={})) + def grpc_call(channel): + if imports_pending: + resolve_imports(channel) + stub = device_pb2_grpc.DeviceStub(channel) + response = stub.Handle(device_pb2.Request(get_status={})) return response.dish_get_status - while True: - channel, reused = context.get_channel() - try: - if imports_pending: - resolve_imports(channel) - stub = device_pb2_grpc.DeviceStub(channel) - response = stub.Handle(device_pb2.Request(get_status={})) - return response.dish_get_status - except grpc.RpcError: - context.close() - if not reused: - raise + return call_with_channel(grpc_call, context=context) def get_id(context=None): @@ -506,8 +538,7 @@ def status_data(context=None): values, in that order. Raises: - GrpcError: Failed getting history info from the Starlink user - terminal. + GrpcError: Failed getting status info from the Starlink user terminal. """ try: status = get_status(context) @@ -712,26 +743,14 @@ def get_history(context=None): Raises: grpc.RpcError: Communication or service error. """ - if context is None: - with grpc.insecure_channel("192.168.100.1:9200") as channel: - if imports_pending: - resolve_imports(channel) - stub = device_pb2_grpc.DeviceStub(channel) - response = stub.Handle(device_pb2.Request(get_history={})) + def grpc_call(channel): + if imports_pending: + resolve_imports(channel) + stub = device_pb2_grpc.DeviceStub(channel) + response = stub.Handle(device_pb2.Request(get_history={})) return response.dish_get_history - while True: - channel, reused = context.get_channel() - try: - if imports_pending: - resolve_imports(channel) - stub = device_pb2_grpc.DeviceStub(channel) - response = stub.Handle(device_pb2.Request(get_history={})) - return response.dish_get_history - except grpc.RpcError: - context.close() - if not reused: - raise + return call_with_channel(grpc_call, context=context) def _compute_sample_range(history, parse_samples, start=None, verbose=False):