greendeck/greendeck/lib/hidapi/library.py
2022-12-13 14:25:50 -06:00

367 lines
10 KiB
Python

import asyncio
import atexit
import ctypes
import os
import platform
from typing import Any
from typing import TypeVar
from pydantic import BaseModel
from greendeck.lib.hidapi import HIDAPIError
Handle = TypeVar("Handle", bound=ctypes.c_void_p)
# mutex: asyncio.Lock = None
LOOP: asyncio.AbstractEventLoop = None
LIBRARY: ctypes.CDLL = None
class DeviceInfo(BaseModel):
path: str
serial_number: str | None
vendor_id: int
product_id: int
release_number: int
product_string: str | None
usage_page: int
usage: int
interface_number: int
class hid_device_info(ctypes.Structure):
"""
Structure definition for the hid_device_info structure defined
in the LibUSB HIDAPI library API.
"""
pass
hid_device_info._fields_ = [
("path", ctypes.c_char_p),
("vendor_id", ctypes.c_ushort),
("product_id", ctypes.c_ushort),
("serial_number", ctypes.c_wchar_p),
("release_number", ctypes.c_ushort),
("manufacturer_string", ctypes.c_wchar_p),
("product_string", ctypes.c_wchar_p),
("usage_page", ctypes.c_ushort),
("usage", ctypes.c_ushort),
("interface_number", ctypes.c_int),
("next", ctypes.POINTER(hid_device_info)),
]
def _get_library_search_list():
if (hidapi_library := os.getenv("HIDAPI_LIBRARY")) is not None:
return [hidapi_library]
if (hidapi_home := os.getenv("HIDAPI_HOME")) is not None:
return [
os.path.join(hidapi_home, "lib", "libhidapi-libusb.so.0"),
]
library_search_list = {
"Windows": ["hidapi.dll", "libhidapi-0.dll"],
"Linux": [
"libhidapi-libusb.so",
"libhidapi-libusb.so.0",
],
"Darwin": ["libhidapi.dylib"],
}.get(platform.system())
if library_search_list is None:
raise HIDAPIError(
"No suitable LibUSB HIDAPI library search names were found for this system."
)
return library_search_list
def _find_hidapi_libary() -> ctypes.CDLL:
for lib_name in _get_library_search_list():
try:
return ctypes.cdll.LoadLibrary(lib_name)
except OSError as e:
print(e)
pass
if LIBRARY is None:
raise HIDAPIError("No suitable LibUSB HIDAPI library found on this system.")
def _load_hidapi_library():
"""
Loads the given LibUSB HIDAPI dynamic library from the host system,
if available.
:rtype: ctypes.CDLL
:return: Loaded HIDAPI library instance, or None if no library was found.
"""
global LIBRARY
if LIBRARY is not None:
return
LIBRARY = _find_hidapi_libary()
LIBRARY.hid_init.argtypes = []
LIBRARY.hid_init.restype = ctypes.c_int
LIBRARY.hid_exit.argtypes = []
LIBRARY.hid_exit.restype = ctypes.c_int
LIBRARY.hid_enumerate.argtypes = [ctypes.c_ushort, ctypes.c_ushort]
LIBRARY.hid_enumerate.restype = ctypes.POINTER(hid_device_info)
LIBRARY.hid_free_enumeration.argtypes = [ctypes.POINTER(hid_device_info)]
LIBRARY.hid_free_enumeration.restype = None
LIBRARY.hid_open_path.argtypes = [ctypes.c_char_p]
LIBRARY.hid_open_path.restype = ctypes.c_void_p
LIBRARY.hid_close.argtypes = [ctypes.c_void_p]
LIBRARY.hid_close.restype = None
LIBRARY.hid_set_nonblocking.argtypes = [ctypes.c_void_p, ctypes.c_int]
LIBRARY.hid_set_nonblocking.restype = ctypes.c_int
LIBRARY.hid_send_feature_report.argtypes = [
ctypes.c_void_p,
ctypes.POINTER(ctypes.c_char),
ctypes.c_size_t,
]
LIBRARY.hid_send_feature_report.restype = ctypes.c_int
LIBRARY.hid_get_feature_report.argtypes = [
ctypes.c_void_p,
ctypes.POINTER(ctypes.c_char),
ctypes.c_size_t,
]
LIBRARY.hid_get_feature_report.restype = ctypes.c_int
LIBRARY.hid_write.argtypes = [
ctypes.c_void_p,
ctypes.POINTER(ctypes.c_char),
ctypes.c_size_t,
]
LIBRARY.hid_write.restype = ctypes.c_int
LIBRARY.hid_read.argtypes = [
ctypes.c_void_p,
ctypes.POINTER(ctypes.c_char),
ctypes.c_size_t,
]
LIBRARY.hid_read.restype = ctypes.c_int
LIBRARY.hid_init()
atexit.register(LIBRARY.hid_exit)
async def run_in_executor(name: str, *args: tuple, **keywords: dict) -> Any:
global LOOP
global LIBRARY
if LOOP is None:
LOOP = asyncio.get_running_loop()
if LIBRARY is None:
_load_hidapi_library()
func = LIBRARY.__getattribute__(name)
return await LOOP.run_in_executor(None, func, *args, **keywords)
async def hid_enumerate(
vendor_id: int | None = None, product_id: int | None = None
) -> list[DeviceInfo]:
"""
Enumerates all available USB HID devices on the system.
:param int vid: USB Vendor ID to filter all devices by, `None` if the
device list should not be filtered by vendor.
:param int pid: USB Product ID to filter all devices by, `None` if the
device list should not be filtered by product.
:rtype: list(dict())
:return: List of discovered USB HID device attributes.
"""
vendor_id = vendor_id or 0
product_id = product_id or 0
device_list: list[dict] = []
device_enumeration: hid_device_info = await run_in_executor(
"hid_enumerate", vendor_id, product_id
)
if device_enumeration:
current_device = device_enumeration
while current_device:
device_list.append(
DeviceInfo(
path=current_device.contents.path.decode("utf-8"),
vendor_id=current_device.contents.vendor_id,
product_id=current_device.contents.product_id,
serial_number=current_device.contents.serial_number,
release_number=current_device.contents.release_number,
manufacturer_string=current_device.contents.manufacturer_string,
product_string=current_device.contents.product_string,
usage_page=current_device.contents.usage_page,
usage=current_device.contents.usage,
interface_number=current_device.contents.interface_number,
)
)
current_device = current_device.contents.next
await run_in_executor("hid_free_enumeration", device_enumeration)
return device_list
async def hid_open_device(path: str | bytes) -> Handle:
"""
Opens a HID device by its canonical path on the host system.
:rtype: Handle
:return: Device handle if opened successfully, None if open failed.
"""
if type(path) is not bytes:
path = bytes(path, "utf-8")
handle: Handle = await run_in_executor("hid_open_path", path)
if not handle:
raise HIDAPIError("Could not open HID device.")
return handle
async def hid_close_device(handle: Handle):
"""
Closes a HID device by its open device handle on the host system.
:param Handle handle: Device handle to close.
"""
if handle:
await run_in_executor("hid_close", handle)
async def hid_send_feature_report(handle: Handle, data: bytes) -> int:
"""
Sends a HID Feature report to an open HID device.
:param Handle handle: Device handle to access.
:param bytes data: Array of bytes to send to the device, as a
feature report. The first byte of the
report should be the Report ID of the
report being sent.
:rtype: int
:return: Number of bytes successfully sent to the device.
"""
if not handle:
raise HIDAPIError("No HID device.")
result: int = await run_in_executor(
"hid_send_feature_report", handle, data, len(data)
)
if result < 0:
raise HIDAPIError("Failed to write feature report (%d)" % result)
return result
async def hid_get_feature_report(handle: Handle, report_id: int, length: int) -> bytes:
"""
Retrieves a HID Feature report from an open HID device.
:param Handle handle: Device handle to access.
:param int report_id: Report ID of the report being read.
:param int length: Maximum length of the Feature report to read.
:rtype: bytes
:return: Array of bytes containing the read Feature report. The
first byte of the report will be the Report ID of the
report that was read.
"""
data = ctypes.create_string_buffer(length)
data[0] = report_id
if not handle:
raise HIDAPIError("No HID device.")
# loop = asyncio.get_event_loop()
result: int = await run_in_executor(
"hid_get_feature_report", handle, data, len(data)
)
if result < 0:
raise HIDAPIError("Failed to read feature report (%d)" % result)
return data.raw[:result]
async def hid_write(handle: Handle, data: bytes) -> int:
"""
Writes a HID Out report to an open HID device.
:param Handle handle: Device handle to access.
:param bytearray() data: Array of bytes to send to the device, as an
out report. The first byte of the report
should be the Report ID of the report being
sent.
:rtype: int
:return: Number of bytes successfully sent to the device.
"""
if not handle:
raise HIDAPIError("No HID device.")
result: int = await run_in_executor("hid_write", handle, data, len(data))
if result < 0:
raise HIDAPIError("Failed to write out report (%d)" % result)
return result
async def hid_read(handle: Handle, length: int) -> bytes | None:
"""
Performs a non-blocking read of a HID In report from an open HID device.
:param Handle handle: Device handle to access.
:param int length: Maximum length of the In report to read.
:rtype: bytes
:return: Array of bytes containing the read In report. The
first byte of the report will be the Report ID of the
report that was read.
"""
data = ctypes.create_string_buffer(length)
if not handle:
raise HIDAPIError("No HID device.")
result: int = await run_in_executor("hid_read", handle, data, len(data))
if result < 0:
raise HIDAPIError(f"Failed to read in report ({result})")
elif result == 0:
return None
return data.raw[:length]