prevent some buttplugio related crashes

Co-authored-by. M\303\246ve Rey <42996147M-rey@users.noreply.github.com>
This commit is contained in:
Nova 2023-03-14 01:09:42 +01:00
parent 09045cdc82
commit cda5265d60

View file

@ -20,16 +20,26 @@ async def connect_plug_client(websocket_url: str) -> Client:
await plug_client.connect(connector) await plug_client.connect(connector)
except Exception as e: except Exception as e:
logging.error(f"Could not connect to server, exiting: {e}") logging.error(f"Could not connect to server, exiting: {e}")
return exit(1)
print("plug client connected") print("plug client connected")
return plug_client return plug_client
async def scan_devices(plug_client: Client) -> Client: async def scan_devices(plug_client: Client) -> Client:
MAX_RETRIES = 5
for i in range(MAX_RETRIES):
# scan for devices for 5 seconds # scan for devices for 5 seconds
if options["debug"]:
print(f"scanning for devices ({i+1}/{MAX_RETRIES}) tries")
await plug_client.start_scanning() await plug_client.start_scanning()
await asyncio.sleep(5) await asyncio.sleep(3)
await plug_client.stop_scanning() await plug_client.stop_scanning()
await asyncio.sleep(0.5)
if plug_client.devices:
break
else:
logging.error(f"No devices found, exiting...")
exit(1)
plug_client.logger.info(f"Devices: {plug_client.devices}") plug_client.logger.info(f"Devices: {plug_client.devices}")
# If we have any device we can print the list of devices # If we have any device we can print the list of devices
@ -48,19 +58,22 @@ async def trigger_actuators(
duration = clamp(actuator_command[0], 0, MAX_DURATION) duration = clamp(actuator_command[0], 0, MAX_DURATION)
power = clamp(actuator_command[1], 0, MAX_POWER) power = clamp(actuator_command[1], 0, MAX_POWER)
# If we have any device we can access it by its ID: # If we have any device we can access it by its ID:
device = plug_client.devices[0] print(plug_client.devices)
if len(device.actuators) != 0: for key in plug_client.devices:
print(len(device.actuators), "actuators found") device = plug_client.devices[key]
# cycle through all actuators in device try:
print(device.actuators) if options["debug"]:
print(f"{duration=} {power=}") print(f"using device {device}")
for actuator in device.actuators: print(len(device.actuators), f"actuators found:{device.actuators}")
# print(device.actuators)
print(f"sending {duration=} {power=}")
for actuator in device.actuators: # cycle through all actuators in device
await actuator.command(power) await actuator.command(power)
print("generic actuator")
await asyncio.sleep(duration) await asyncio.sleep(duration)
# stops all actuators for actuator in device.actuators: # stops all actuators
for actuator in device.actuators:
await actuator.command(0) await actuator.command(0)
except Exception as e:
logging.error(f"Could not find devices/actuators: {e}")
""" """