starlink-grpc-tools/starlink_grpc_tools/dish_obstruction_map.py

232 lines
7.3 KiB
Python

#!/usr/bin/python3
"""Write a PNG image representing Starlink obstruction map data.
This scripts queries obstruction map data from the Starlink user terminal
reachable on the local network and writes a PNG image based on that data.
"""
import argparse
from datetime import datetime
import logging
import os
import sys
import time
import png
import starlink_grpc_tools.starlink_grpc as starlink_grpc
DEFAULT_OBSTRUCTED_COLOR = "FFFF0000"
DEFAULT_UNOBSTRUCTED_COLOR = "FFFFFFFF"
DEFAULT_NO_DATA_COLOR = "00000000"
DEFAULT_OBSTRUCTED_GREYSCALE = "FF00"
DEFAULT_UNOBSTRUCTED_GREYSCALE = "FFFF"
DEFAULT_NO_DATA_GREYSCALE = "0000"
LOOP_TIME_DEFAULT = 0
def loop_body(opts, context):
try:
snr_data = starlink_grpc.obstruction_map(context)
except starlink_grpc.GrpcError as e:
logging.error("Failed getting obstruction map data: %s", str(e))
return 1
def pixel_bytes(row):
for point in row:
if point > 1.0:
# shouldn't happen, but just in case...
point = 1.0
if point >= 0.0:
if opts.greyscale:
yield round(
point * opts.unobstructed_color_g
+ (1.0 - point) * opts.obstructed_color_g
)
else:
yield round(
point * opts.unobstructed_color_r
+ (1.0 - point) * opts.obstructed_color_r
)
yield round(
point * opts.unobstructed_color_g
+ (1.0 - point) * opts.obstructed_color_g
)
yield round(
point * opts.unobstructed_color_b
+ (1.0 - point) * opts.obstructed_color_b
)
if not opts.no_alpha:
yield round(
point * opts.unobstructed_color_a
+ (1.0 - point) * opts.obstructed_color_a
)
else:
if opts.greyscale:
yield opts.no_data_color_g
else:
yield opts.no_data_color_r
yield opts.no_data_color_g
yield opts.no_data_color_b
if not opts.no_alpha:
yield opts.no_data_color_a
if opts.filename == "-":
# Open new stdout file to get binary mode
out_file = os.fdopen(sys.stdout.fileno(), "wb", closefd=False)
else:
now = int(time.time())
filename = opts.filename.replace("%u", str(now))
filename = filename.replace(
"%d", datetime.utcfromtimestamp(now).strftime("%Y_%m_%d_%H_%M_%S")
)
filename = filename.replace("%s", str(opts.sequence))
out_file = open(filename, "wb")
if not snr_data or not snr_data[0]:
logging.error("Invalid SNR map data: Zero-length")
return 1
writer = png.Writer(
len(snr_data[0]),
len(snr_data),
alpha=(not opts.no_alpha),
greyscale=opts.greyscale,
)
writer.write(out_file, (bytes(pixel_bytes(row)) for row in snr_data))
out_file.close()
opts.sequence += 1
return 0
def parse_args():
parser = argparse.ArgumentParser(
description="Collect directional obstruction map data from a Starlink user terminal and "
"emit it as a PNG image"
)
parser.add_argument(
"filename",
help="The image file to write, or - to write to stdout; may be a template with the "
"following to be filled in per loop iteration: %%s for sequence number, %%d for UTC date "
"and time, %%u for seconds since Unix epoch.",
)
parser.add_argument(
"-o",
"--obstructed-color",
help="Color of obstructed areas, in RGB, ARGB, L, or AL hex notation, default: "
+ DEFAULT_OBSTRUCTED_COLOR
+ " or "
+ DEFAULT_OBSTRUCTED_GREYSCALE,
)
parser.add_argument(
"-u",
"--unobstructed-color",
help="Color of unobstructed areas, in RGB, ARGB, L, or AL hex notation, default: "
+ DEFAULT_UNOBSTRUCTED_COLOR
+ " or "
+ DEFAULT_UNOBSTRUCTED_GREYSCALE,
)
parser.add_argument(
"-n",
"--no-data-color",
help="Color of areas with no data, in RGB, ARGB, L, or AL hex notation, default: "
+ DEFAULT_NO_DATA_COLOR
+ " or "
+ DEFAULT_NO_DATA_GREYSCALE,
)
parser.add_argument(
"-g",
"--greyscale",
action="store_true",
help="Emit a greyscale image instead of the default full color image; greyscale images "
"use L or AL hex notation for the color options",
)
parser.add_argument(
"-z",
"--no-alpha",
action="store_true",
help="Emit an image without alpha (transparency) channel instead of the default that "
"includes alpha channel",
)
parser.add_argument(
"-e",
"--target",
help="host:port of dish to query, default is the standard IP address "
"and port (192.168.100.1:9200)",
)
parser.add_argument(
"-t",
"--loop-interval",
type=float,
default=float(LOOP_TIME_DEFAULT),
help="Loop interval in seconds or 0 for no loop, default: "
+ str(LOOP_TIME_DEFAULT),
)
parser.add_argument(
"-s",
"--sequence",
type=int,
default=1,
help="Starting sequence number for templatized filenames, default: 1",
)
opts = parser.parse_args()
if opts.obstructed_color is None:
opts.obstructed_color = (
DEFAULT_OBSTRUCTED_GREYSCALE if opts.greyscale else DEFAULT_OBSTRUCTED_COLOR
)
if opts.unobstructed_color is None:
opts.unobstructed_color = (
DEFAULT_UNOBSTRUCTED_GREYSCALE
if opts.greyscale
else DEFAULT_UNOBSTRUCTED_COLOR
)
if opts.no_data_color is None:
opts.no_data_color = (
DEFAULT_NO_DATA_GREYSCALE if opts.greyscale else DEFAULT_NO_DATA_COLOR
)
for option in ("obstructed_color", "unobstructed_color", "no_data_color"):
try:
color = int(getattr(opts, option), 16)
if opts.greyscale:
setattr(opts, option + "_a", (color >> 8) & 255)
setattr(opts, option + "_g", color & 255)
else:
setattr(opts, option + "_a", (color >> 24) & 255)
setattr(opts, option + "_r", (color >> 16) & 255)
setattr(opts, option + "_g", (color >> 8) & 255)
setattr(opts, option + "_b", color & 255)
except ValueError:
logging.error("Invalid hex number for %s", option)
sys.exit(1)
return opts
def main():
opts = parse_args()
logging.basicConfig(format="%(levelname)s: %(message)s")
context = starlink_grpc.ChannelContext(target=opts.target)
try:
next_loop = time.monotonic()
while True:
rc = loop_body(opts, context)
if opts.loop_interval > 0.0:
now = time.monotonic()
next_loop = max(next_loop + opts.loop_interval, now)
time.sleep(next_loop - now)
else:
break
finally:
context.close()
sys.exit(rc)
if __name__ == "__main__":
main()