341 lines
11 KiB
Python
341 lines
11 KiB
Python
import asyncio
|
|
import functools
|
|
import logging
|
|
import pathlib
|
|
|
|
import click
|
|
|
|
# from PIL import ImageDraw
|
|
# from PIL import ImageFont
|
|
from pydantic import parse_obj_as
|
|
from pydantic_yaml import parse_yaml_raw_as
|
|
from typing_extensions import Self
|
|
|
|
from greendeck.config import Config
|
|
from greendeck.config import HomeAssistantIconImageConfig
|
|
from greendeck.config import HomeAssistantServiceActionConfig
|
|
from greendeck.config import ImageConfig
|
|
from greendeck.config import PulseAudioActionConfig
|
|
from greendeck.config import StreamDeckConfig
|
|
from greendeck.config import TileConfig
|
|
from greendeck.lib.actions.pulseaudio import handle_pulseaudio_action
|
|
from greendeck.lib.elgato import enumerate_streamdecks
|
|
from greendeck.lib.elgato.streamdeck import StreamDeck
|
|
from greendeck.lib.homeassistant import EventResponse
|
|
from greendeck.lib.homeassistant import HomeAssistant
|
|
from greendeck.lib.images.staticiconimage import render_static_icon_image
|
|
from greendeck.lib.logging import setup_logging
|
|
from greendeck.lib.util import task_done_callback
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DeckHandler:
|
|
"""Class to manage connected StreamDeck."""
|
|
|
|
deck: StreamDeck
|
|
deck_config: StreamDeckConfig
|
|
config: Config
|
|
homeassistant: HomeAssistant
|
|
homeassistant_states: list[str | None]
|
|
key_states: list[bool | None]
|
|
|
|
def __init__(
|
|
self,
|
|
deck: StreamDeck,
|
|
deck_config: StreamDeckConfig,
|
|
config: Config,
|
|
homeassistant: HomeAssistant,
|
|
):
|
|
self.deck = deck
|
|
self.deck_config = deck_config
|
|
self.config = config
|
|
self.homeassistant = homeassistant
|
|
self.current_screen = self.deck_config.screen
|
|
self.homeassistant_states = [None] * self.deck.KEY_COUNT
|
|
self.key_states = [None] * self.deck.KEY_COUNT
|
|
|
|
def convert_key_to_position(self: Self, key: int) -> tuple[int, int]:
|
|
return self.deck.convert_key_to_position(key)
|
|
|
|
def get_screen_config(self: Self) -> list[TileConfig] | None:
|
|
return self.config.screens.get(self.current_screen)
|
|
|
|
def get_tile_config_for_key(self: Self, key: int) -> TileConfig | None:
|
|
position = self.convert_key_to_position(key)
|
|
|
|
screen = self.get_screen_config()
|
|
if screen is None:
|
|
return None
|
|
|
|
for tile_config in screen:
|
|
if position == tile_config.position:
|
|
return tile_config
|
|
|
|
return None
|
|
|
|
async def start(self: Self):
|
|
await self.deck.open()
|
|
await self.deck.reset()
|
|
|
|
serial = await self.deck.get_serial_number()
|
|
fw = await self.deck.get_firmware_version()
|
|
logger.info(
|
|
f"opened '{self.deck.DECK_TYPE}' device "
|
|
f"(serial number: '{serial}', fw: '{fw}')"
|
|
)
|
|
|
|
self.deck.set_key_callback(self.key_callback)
|
|
|
|
await self.deck.set_brightness(self.deck_config.brightness)
|
|
|
|
tasks: list[asyncio.Task] = []
|
|
for key in range(self.deck.KEY_COUNT):
|
|
tasks.append(asyncio.create_task(self.initialize_key(key)))
|
|
|
|
for task in tasks:
|
|
task.add_done_callback(task_done_callback)
|
|
|
|
await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
|
|
|
|
async def initialize_key(self: Self, key: int) -> None:
|
|
await self.update_key_image(key)
|
|
await self.update_homeassistant_callbacks(key)
|
|
|
|
async def update_key_state(
|
|
self: Self, key: int, old_state: bool | None, new_state: bool
|
|
) -> None:
|
|
self.key_states[key] = new_state
|
|
|
|
if new_state is False:
|
|
t = asyncio.create_task(
|
|
self.process_key_actions(key),
|
|
name=(
|
|
f"process key actions on streamdeck "
|
|
f"{self.deck.serial_number} key {key}"
|
|
),
|
|
)
|
|
t.add_done_callback(task_done_callback)
|
|
|
|
if old_state is not None and not (old_state ^ new_state):
|
|
logger.info("no change - skipping image update")
|
|
return
|
|
|
|
t = asyncio.create_task(
|
|
self.update_key_image(key),
|
|
name=f"update image on streamdeck {self.deck.serial_number} key {key}",
|
|
)
|
|
t.add_done_callback(task_done_callback)
|
|
|
|
async def update_key_image(self: Self, key: int) -> None:
|
|
tile_config = self.get_tile_config_for_key(key)
|
|
|
|
if tile_config is None:
|
|
image = self.deck.BLANK_KEY_IMAGE
|
|
|
|
else:
|
|
match tile_config.image.type:
|
|
case "static-icon":
|
|
image_config = parse_obj_as(ImageConfig, tile_config.image)
|
|
image = await render_static_icon_image(
|
|
self.deck, self.key_states[key], image_config
|
|
)
|
|
|
|
case "homeassistant-icon":
|
|
home_assistant_icon_image_config = parse_obj_as(
|
|
HomeAssistantIconImageConfig, tile_config.image
|
|
)
|
|
|
|
image_config = home_assistant_icon_image_config.states.get(
|
|
self.homeassistant_states[key]
|
|
)
|
|
|
|
if image_config is None:
|
|
image_config = home_assistant_icon_image_config.states.get(None)
|
|
|
|
if image_config is not None:
|
|
image = await render_static_icon_image(
|
|
self.deck, self.key_states[key], image_config
|
|
)
|
|
|
|
else:
|
|
print(f"no state matched {self.homeassistant_states[key]}")
|
|
image = self.deck.BLANK_KEY_IMAGE
|
|
|
|
case _:
|
|
image = self.deck.BLANK_KEY_IMAGE
|
|
|
|
await self.deck.set_key_image(key, image)
|
|
|
|
async def key_callback(
|
|
self: Self,
|
|
deck: StreamDeck,
|
|
key: int,
|
|
old_state: bool | None,
|
|
new_state: bool,
|
|
) -> None:
|
|
assert self.deck.serial_number == deck.serial_number
|
|
|
|
position = self.convert_key_to_position(key)
|
|
|
|
logger.info(
|
|
f"deck {deck.serial_number} key {key} {position} {old_state} → {new_state}",
|
|
)
|
|
|
|
await self.update_key_state(key, old_state, new_state)
|
|
|
|
async def process_key_actions(self: Self, key: int):
|
|
tile_config = self.get_tile_config_for_key(key)
|
|
if tile_config is not None:
|
|
for action_config in tile_config.actions:
|
|
match action_config.__root__.type:
|
|
case "homeassistant-service":
|
|
action_config = HomeAssistantServiceActionConfig.parse_obj(
|
|
action_config.__root__
|
|
)
|
|
await self.homeassistant.call_service(
|
|
domain=action_config.domain,
|
|
service=action_config.service,
|
|
data=action_config.data,
|
|
)
|
|
|
|
case "pulseaudio-action":
|
|
action_config = PulseAudioActionConfig.parse_obj(
|
|
action_config.__root__
|
|
)
|
|
await handle_pulseaudio_action(action_config)
|
|
|
|
case _:
|
|
logger.error(f"unimplemented {action_config.__root__.type}")
|
|
|
|
async def homeassistant_callback(
|
|
self: Self,
|
|
key: int,
|
|
event: EventResponse,
|
|
) -> None:
|
|
self.homeassistant_states[key] = event.event.data.new_state.state
|
|
|
|
t = asyncio.create_task(self.update_key_image(key))
|
|
t.add_done_callback(task_done_callback)
|
|
|
|
async def update_homeassistant_callbacks(self: Self, key: int):
|
|
tile_config = self.get_tile_config_for_key(key)
|
|
if tile_config is None:
|
|
return
|
|
|
|
match tile_config.image.type:
|
|
case "homeassistant-icon":
|
|
homeassistant_icon_image_config = parse_obj_as(
|
|
HomeAssistantIconImageConfig, tile_config.image
|
|
)
|
|
self.homeassistant.add_event_callback(
|
|
homeassistant_icon_image_config.entity_id,
|
|
functools.partial(
|
|
self.homeassistant_callback,
|
|
key,
|
|
),
|
|
)
|
|
event_response = await self.homeassistant.get_state(
|
|
homeassistant_icon_image_config.entity_id
|
|
)
|
|
if event_response is not None:
|
|
self.homeassistant_states[key] = event_response.state
|
|
await self.update_key_image(key)
|
|
|
|
|
|
async def _main(config: Config):
|
|
decks: dict[str, tuple[StreamDeck, StreamDeckConfig]] = {}
|
|
|
|
try:
|
|
streamdecks = await enumerate_streamdecks()
|
|
|
|
if len(streamdecks) == 1:
|
|
logger.info("found 1 StreamDeck")
|
|
else:
|
|
logger.info(f"found {len(streamdecks)} StreamDecks")
|
|
|
|
for deck_config in config.streamdecks:
|
|
if deck_config.serial_number is None:
|
|
continue
|
|
for deck in streamdecks:
|
|
if deck_config.serial_number == deck.serial_number:
|
|
decks[deck.serial_number] = (
|
|
deck,
|
|
deck_config,
|
|
)
|
|
|
|
for deck_config in config.streamdecks:
|
|
if deck_config.index is None:
|
|
continue
|
|
if deck_config.index >= len(streamdecks):
|
|
logger.error(f"index {deck_config.index} is out of bounds")
|
|
continue
|
|
deck = streamdecks[deck_config.index]
|
|
if deck.serial_number in decks:
|
|
logger.error(
|
|
f"deck {deck_config.index} was already configured "
|
|
f"by serial number {deck.serial_number}"
|
|
)
|
|
continue
|
|
decks[deck.serial_number] = (
|
|
deck,
|
|
deck_config,
|
|
)
|
|
|
|
homeassistant = HomeAssistant(
|
|
host=config.homeassistant.hostname,
|
|
port=config.homeassistant.port,
|
|
secure=config.homeassistant.secure,
|
|
token=config.homeassistant.token,
|
|
)
|
|
|
|
tasks = [asyncio.create_task(homeassistant.start(), name="homeassistant")]
|
|
|
|
for deck, deck_config in decks.values():
|
|
deck_handler = DeckHandler(deck, deck_config, config, homeassistant)
|
|
tasks.append(asyncio.create_task(deck_handler.start()))
|
|
|
|
for task in tasks:
|
|
task.add_done_callback(task_done_callback)
|
|
|
|
await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
|
|
|
|
while True:
|
|
await asyncio.sleep(5)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("keyboard_interrupt")
|
|
|
|
finally:
|
|
for deck, _ in decks.values():
|
|
print(deck.serial_number)
|
|
if await deck.is_open():
|
|
await deck.reset()
|
|
await deck.close()
|
|
|
|
|
|
@click.command
|
|
@click.option(
|
|
"--config",
|
|
"config_path",
|
|
type=click.Path(
|
|
exists=True,
|
|
file_okay=True,
|
|
dir_okay=False,
|
|
allow_dash=False,
|
|
path_type=pathlib.Path,
|
|
),
|
|
default="./config.yaml",
|
|
)
|
|
def main(
|
|
config_path: pathlib.Path,
|
|
):
|
|
setup_logging()
|
|
# config = Config.parse_raw(config_path.read_bytes(), proto="yaml")
|
|
config: Config = parse_yaml_raw_as(Config, config_path.read_bytes())
|
|
asyncio.run(_main(config))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|