Switch generated protobuf module imports around

The way I had them, it was hiding the fact that there was no explicit import for the spacex.api.device.dish_pb2 module.
This commit is contained in:
sparky8512 2021-02-04 20:02:45 -08:00
parent 549a46ae56
commit db83a7f042
2 changed files with 21 additions and 19 deletions

View file

@ -1,16 +1,17 @@
#!/usr/bin/python3 #!/usr/bin/python3
"""Simple example of get_status request use grpc call directly.""" """Simple example of get_status request using grpc call directly."""
import grpc import grpc
import spacex.api.device.device_pb2 from spacex.api.device import device_pb2
import spacex.api.device.device_pb2_grpc from spacex.api.device import device_pb2_grpc
from spacex.api.device import dish_pb2
# Note that if you remove the 'with' clause here, you need to separately # Note that if you remove the 'with' clause here, you need to separately
# call channel.close() when you're done with the gRPC connection. # call channel.close() when you're done with the gRPC connection.
with grpc.insecure_channel("192.168.100.1:9200") as channel: with grpc.insecure_channel("192.168.100.1:9200") as channel:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) stub = device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) response = stub.Handle(device_pb2.Request(get_status={}))
# Dump everything # Dump everything
print(response) print(response)
@ -20,4 +21,4 @@ print("Software version:", response.dish_get_status.device_info.software_version
# Check if connected # Check if connected
print("Connected" if response.dish_get_status.state == print("Connected" if response.dish_get_status.state ==
spacex.api.device.dish_pb2.DishState.CONNECTED else "Not connected") dish_pb2.DishState.CONNECTED else "Not connected")

View file

@ -300,8 +300,9 @@ import statistics
import grpc import grpc
import spacex.api.device.device_pb2 from spacex.api.device import device_pb2
import spacex.api.device.device_pb2_grpc from spacex.api.device import device_pb2_grpc
from spacex.api.device import dish_pb2
class GrpcError(Exception): class GrpcError(Exception):
@ -352,7 +353,7 @@ def status_field_names():
names, and obstruction detail field names, in that order. names, and obstruction detail field names, in that order.
""" """
alert_names = [] alert_names = []
for field in spacex.api.device.dish_pb2.DishAlerts.DESCRIPTOR.fields: for field in dish_pb2.DishAlerts.DESCRIPTOR.fields:
alert_names.append("alert_" + field.name) alert_names.append("alert_" + field.name)
return [ return [
@ -406,7 +407,7 @@ def status_field_types():
], [ ], [
float, # wedges_fraction_obstructed[] float, # wedges_fraction_obstructed[]
float, # valid_s float, # valid_s
], [bool] * len(spacex.api.device.dish_pb2.DishAlerts.DESCRIPTOR.fields) ], [bool] * len(dish_pb2.DishAlerts.DESCRIPTOR.fields)
def get_status(context=None): def get_status(context=None):
@ -423,15 +424,15 @@ def get_status(context=None):
""" """
if context is None: if context is None:
with grpc.insecure_channel("192.168.100.1:9200") as channel: with grpc.insecure_channel("192.168.100.1:9200") as channel:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) stub = device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) response = stub.Handle(device_pb2.Request(get_status={}))
return response.dish_get_status return response.dish_get_status
while True: while True:
channel, reused = context.get_channel() channel, reused = context.get_channel()
try: try:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) stub = device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) response = stub.Handle(device_pb2.Request(get_status={}))
return response.dish_get_status return response.dish_get_status
except grpc.RpcError: except grpc.RpcError:
context.close() context.close()
@ -495,7 +496,7 @@ def status_data(context=None):
"id": status.device_info.id, "id": status.device_info.id,
"hardware_version": status.device_info.hardware_version, "hardware_version": status.device_info.hardware_version,
"software_version": status.device_info.software_version, "software_version": status.device_info.software_version,
"state": spacex.api.device.dish_pb2.DishState.Name(status.state), "state": dish_pb2.DishState.Name(status.state),
"uptime": status.device_state.uptime_s, "uptime": status.device_state.uptime_s,
"snr": status.snr, "snr": status.snr,
"seconds_to_first_nonempty_slot": status.seconds_to_first_nonempty_slot, "seconds_to_first_nonempty_slot": status.seconds_to_first_nonempty_slot,
@ -680,15 +681,15 @@ def get_history(context=None):
""" """
if context is None: if context is None:
with grpc.insecure_channel("192.168.100.1:9200") as channel: with grpc.insecure_channel("192.168.100.1:9200") as channel:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) stub = device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_history={})) response = stub.Handle(device_pb2.Request(get_history={}))
return response.dish_get_history return response.dish_get_history
while True: while True:
channel, reused = context.get_channel() channel, reused = context.get_channel()
try: try:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) stub = device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_history={})) response = stub.Handle(device_pb2.Request(get_history={}))
return response.dish_get_history return response.dish_get_history
except grpc.RpcError: except grpc.RpcError:
context.close() context.close()