Keep grpc channel open across RPC calls

This restores the functionality that the InfluxDB status polling script had whereby instead of using a new grpc Channel for each RPC call, it would keep one open and reuse it, retrying one time if it ever fails, which can happen if the connection is lost between calls. Now all the grpc scripts have this functionality.

Also, hedge a little bit in the descriptions for what the obstruction detail fields means, given that I'm not sure my assumptions there are correct.
This commit is contained in:
sparky8512 2021-01-30 11:24:17 -08:00
parent 45b563f91a
commit 68c1413dbd
5 changed files with 133 additions and 48 deletions

View file

@ -117,11 +117,15 @@ def conn_error(opts, msg, *args):
class GlobalState:
"""A mostly empty class for keeping state across loop iterations."""
"""A class for keeping state across loop iterations."""
def __init__(self):
self.counter = None
self.timestamp = None
self.dish_id = None
self.context = starlink_grpc.ChannelContext()
def shutdown(self):
self.context.close()
def get_data(opts, gstate, add_item, add_sequence, add_bulk=None):
@ -159,7 +163,8 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None):
if opts.satus_mode:
try:
status_data, obstruct_detail, alert_detail = starlink_grpc.status_data()
status_data, obstruct_detail, alert_detail = starlink_grpc.status_data(
context=gstate.context)
except starlink_grpc.GrpcError as e:
if "status" in opts.mode:
if opts.need_id and gstate.dish_id is None:
@ -182,7 +187,7 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None):
add_data(alert_detail, "status")
elif opts.need_id and gstate.dish_id is None:
try:
gstate.dish_id = starlink_grpc.get_id()
gstate.dish_id = starlink_grpc.get_id(context=gstate.context)
except starlink_grpc.GrpcError as e:
conn_error(opts, "Failure getting dish ID: %s", str(e))
return 1
@ -191,7 +196,9 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None):
if opts.ping_mode:
try:
general, ping, runlen = starlink_grpc.history_ping_stats(opts.samples, opts.verbose)
general, ping, runlen = starlink_grpc.history_ping_stats(opts.samples,
opts.verbose,
context=gstate.context)
except starlink_grpc.GrpcError as e:
conn_error(opts, "Failure getting ping stats: %s", str(e))
return 1
@ -209,7 +216,8 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None):
try:
general, bulk = starlink_grpc.history_bulk_data(parse_samples,
start=start,
verbose=opts.verbose)
verbose=opts.verbose,
context=gstate.context)
except starlink_grpc.GrpcError as e:
conn_error(opts, "Failure getting history: %s", str(e))
return 1

View file

@ -313,6 +313,7 @@ def main():
if gstate.points:
rc = flush_points(opts, gstate)
gstate.influx_client.close()
gstate.shutdown()
sys.exit(rc)

View file

@ -113,15 +113,18 @@ def main():
gstate = dish_common.GlobalState()
next_loop = time.monotonic()
while True:
rc = loop_body(opts, gstate)
if opts.loop_interval > 0.0:
now = time.monotonic()
next_loop = max(next_loop + opts.loop_interval, now)
time.sleep(next_loop - now)
else:
break
try:
next_loop = time.monotonic()
while True:
rc = loop_body(opts, gstate)
if opts.loop_interval > 0.0:
now = time.monotonic()
next_loop = max(next_loop + opts.loop_interval, now)
time.sleep(next_loop - now)
else:
break
finally:
gstate.shutdown()
sys.exit(rc)

View file

@ -165,15 +165,18 @@ def main():
gstate = dish_common.GlobalState()
next_loop = time.monotonic()
while True:
rc = loop_body(opts, gstate)
if opts.loop_interval > 0.0:
now = time.monotonic()
next_loop = max(next_loop + opts.loop_interval, now)
time.sleep(next_loop - now)
else:
break
try:
next_loop = time.monotonic()
while True:
rc = loop_body(opts, gstate)
if opts.loop_interval > 0.0:
now = time.monotonic()
next_loop = max(next_loop + opts.loop_interval, now)
time.sleep(next_loop - now)
else:
break
finally:
gstate.shutdown()
sys.exit(rc)

View file

@ -52,9 +52,9 @@ This group holds information about the current state of the user terminal.
indicates the alert is active. See alert detail status data for which bits
correspond with each alert, or to get individual alert flags instead of a
combined bit mask.
: **fraction_obstructed** : The fraction of total area that the user terminal
has determined to be obstructed between it and the satellites with which
it communicates.
: **fraction_obstructed** : The fraction of total area (or possibly fraction
of time?) that the user terminal has determined to be obstructed between
it and the satellites with which it communicates.
: **currently_obstructed** : Most recent sample value. See bulk history data
for detail.
: **seconds_obstructed** : The amount of time within the history buffer
@ -71,10 +71,13 @@ user terminal has determined to be obstructed.
: **wedges_fraction_obstructed** : A 12 element sequence. Each element
represents a 30 degree wedge of area and its value indicates the fraction
of area within that wedge that the user terminal has determined to be
obstructed between it and the satellites with which it communicates. The
values are expressed as a fraction of total area, not a fraction of the
wedge, so max value for each element should be 1/12.
of area (time?) within that wedge that the user terminal has determined to
be obstructed between it and the satellites with which it communicates.
The values are expressed as a fraction of total, not a fraction of the
wedge, so max value for each element should be 1/12. The first element in
the sequence represents the wedge that spans exactly North to 30 degrees
East of North, and subsequent wedges rotate 30 degrees further in the same
direction. (It's not clear if this will hold true at all latitudes.)
See also *fraction_obstructed* in general status data, which should equal the
sum of all *wedges_fraction_obstructed* elements.
@ -229,6 +232,25 @@ class GrpcError(Exception):
super().__init__(msg, *args, **kwargs)
class ChannelContext:
"""A wrapper for reusing an open grpc Channel across calls."""
def __init__(self, target="192.168.100.1:9200"):
self.channel = None
self.target = target
def get_channel(self):
reused = True
if self.channel is None:
self.channel = grpc.insecure_channel(self.target)
reused = False
return self.channel, reused
def close(self):
if self.channel is not None:
self.channel.close()
self.channel = None
def status_field_names():
"""Return the field names of the status data.
@ -265,21 +287,43 @@ def status_field_names():
], alert_names
def get_status():
def get_status(context=None):
"""Fetch status data and return it in grpc structure format.
Args:
context (ChannelContext): Optionally provide a channel for reuse
across repeated calls. If an existing channel is reused, the RPC
call will be retried at most once, since connectivity may have
been lost and restored in the time since it was last used.
Raises:
grpc.RpcError: Communication or service error.
"""
with grpc.insecure_channel("192.168.100.1:9200") as channel:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={}))
return response.dish_get_status
if context is None:
with grpc.insecure_channel("192.168.100.1:9200") as channel:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={}))
return response.dish_get_status
while True:
channel, reused = context.get_channel()
try:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={}))
return response.dish_get_status
except grpc.RpcError:
context.close()
if not reused:
raise
def get_id():
def get_id(context=None):
"""Return the ID from the dish status information.
Args:
context (ChannelContext): Optionally provide a channel for reuse
across repeated calls.
Returns:
A string identifying the Starlink user terminal reachable from the
local network.
@ -288,15 +332,19 @@ def get_id():
GrpcError: No user terminal is currently reachable.
"""
try:
status = get_status()
status = get_status(context)
return status.device_info.id
except grpc.RpcError as e:
raise GrpcError(e)
def status_data():
def status_data(context=None):
"""Fetch current status data.
Args:
context (ChannelContext): Optionally provide a channel for reuse
across repeated calls.
Returns:
A tuple with 3 dicts, the first mapping status data names to their
values, the second mapping alert detail field names to their values,
@ -307,7 +355,7 @@ def status_data():
terminal.
"""
try:
status = get_status()
status = get_status(context)
except grpc.RpcError as e:
raise GrpcError(e)
@ -397,16 +445,34 @@ def history_ping_field_names():
]
def get_history():
def get_history(context=None):
"""Fetch history data and return it in grpc structure format.
Args:
context (ChannelContext): Optionally provide a channel for reuse
across repeated calls. If an existing channel is reused, the RPC
call will be retried at most once, since connectivity may have
been lost and restored in the time since it was last used.
Raises:
grpc.RpcError: Communication or service error.
"""
with grpc.insecure_channel("192.168.100.1:9200") as channel:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_history={}))
return response.dish_get_history
if context is None:
with grpc.insecure_channel("192.168.100.1:9200") as channel:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_history={}))
return response.dish_get_history
while True:
channel, reused = context.get_channel()
try:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_history={}))
return response.dish_get_history
except grpc.RpcError:
context.close()
if not reused:
raise
def _compute_sample_range(history, parse_samples, start=None, verbose=False):
@ -448,7 +514,7 @@ def _compute_sample_range(history, parse_samples, start=None, verbose=False):
return sample_range, current - start, current
def history_bulk_data(parse_samples, start=None, verbose=False):
def history_bulk_data(parse_samples, start=None, verbose=False, context=None):
"""Fetch history data for a range of samples.
Args:
@ -467,6 +533,8 @@ def history_bulk_data(parse_samples, start=None, verbose=False):
samples as being later than the requested start, and thus include
them (bounded by parse_samples, if it is not -1).
verbose (bool): Optionally produce verbose output.
context (ChannelContext): Optionally provide a channel for reuse
across repeated calls.
Returns:
A tuple with 2 dicts, the first mapping general data names to their
@ -482,7 +550,7 @@ def history_bulk_data(parse_samples, start=None, verbose=False):
terminal.
"""
try:
history = get_history()
history = get_history(context)
except grpc.RpcError as e:
raise GrpcError(e)
@ -523,7 +591,7 @@ def history_bulk_data(parse_samples, start=None, verbose=False):
}
def history_ping_stats(parse_samples, verbose=False):
def history_ping_stats(parse_samples, verbose=False, context=None):
"""Fetch, parse, and compute the packet loss stats.
Note:
@ -533,6 +601,8 @@ def history_ping_stats(parse_samples, verbose=False):
parse_samples (int): Number of samples to process, or -1 to parse all
available samples.
verbose (bool): Optionally produce verbose output.
context (ChannelContext): Optionally provide a channel for reuse
across repeated calls.
Returns:
A tuple with 3 dicts, the first mapping general data names to their
@ -544,7 +614,7 @@ def history_ping_stats(parse_samples, verbose=False):
terminal.
"""
try:
history = get_history()
history = get_history(context)
except grpc.RpcError as e:
raise GrpcError(e)