diff --git a/fediplug/buttplugio.py b/fediplug/buttplugio.py index 57f201e..4938739 100644 --- a/fediplug/buttplugio.py +++ b/fediplug/buttplugio.py @@ -36,20 +36,22 @@ async def scan_devices(plug_client): return plug_client -async def trigger_actuators(plug_client, play_command): +async def trigger_actuators(plug_client, actuator_command): + MAX_DURATION = 60 # maximum duration in seconds + MAX_POWER = 1 # has to be 0 <= n <= 1 or it will not work + duration = clamp(actuator_command[0], 0, MAX_DURATION) + power = clamp(actuator_command[1], 0, MAX_POWER) # If we have any device we can access it by its ID: device = plug_client.devices[0] - # The most common case among devices is that they have some actuators - # which accept a scalar value (0.0-1.0) as their command. - play_command = (1, 1) if len(device.actuators) != 0: print(len(device.actuators), "actuators found") # cycle through all actuators in device print(device.actuators) + print(f"{duration=} {power=}") for actuator in device.actuators: - await actuator.command(play_command[0]) + await actuator.command(power) print("generic actuator") - await asyncio.sleep(play_command[1]) + await asyncio.sleep(duration) #stops all actuators for actuator in device.actuators: await actuator.command(0) @@ -73,6 +75,9 @@ async def disconnect_plug_client(plug_client): # Disconnect the plug_client. await plug_client.disconnect() +def clamp(n, smallest, largest): + '''returns the closest value to n still in range (clamp)''' + return max(smallest, min(n, largest)) # First things first. We set logging to the console and at INFO level. logging.basicConfig(stream=sys.stdout, level=logging.INFO) diff --git a/fediplug/mastodon.py b/fediplug/mastodon.py index de53dd6..e62a058 100644 --- a/fediplug/mastodon.py +++ b/fediplug/mastodon.py @@ -9,6 +9,8 @@ import lxml.html as lh from lxml.html.clean import clean_html import mastodon import asyncio +import re +from itertools import cycle from fediplug.cli import options import fediplug.keyring as keyring @@ -31,6 +33,23 @@ class StreamListener(mastodon.StreamListener): self.instance = instance self.users = users self.event_loop = event_loop + self.regular_expression = re.compile(r"((?:\b(?:\d+s)(?:\s|\b))+(?:\d+%)?)+") + # extracts commands from captured toots for usage in buttplug.io actuator + # if no power % is given, a default will be set later + # examples: + # input: "#fediplug @nova_ have fun :) 10s 50% 4s 5s" + # output: ["10s 50%", "4s 5s"] + # + # input: "60% 10s @maeve 5s 7s 10% foo bar 8s baz 20% 30s 1337." + # output: ["10s 5s 7s 10%", "8s 20%", "30s"] + # + # input: "10s6 80%" + # output: [] + # + # watch out for this quirk: + # input "10s 70%8" + # output: ["10s 70%"] + # TODO: fix this, it should match the 70% because there isnt a word boundary after it if options['debug']: print(rf'listener initialized with users={self.users}') @@ -40,6 +59,7 @@ class StreamListener(mastodon.StreamListener): print(rf'incoming status: acct={status.account.acct}') if self.users and normalize_username(status.account.acct, self.instance) not in self.users: + # TODO: only do this if no toot from self.users with #fediplug has been captured yet, else check in_reply_to_ID if options['debug']: print('skipping status due to username filtering') return @@ -49,13 +69,14 @@ class StreamListener(mastodon.StreamListener): print(rf'expecting: {LISTEN_TO_HASHTAG}, extracted tags: {tags}') if LISTEN_TO_HASHTAG in tags: + # TODO: if Hashtag matches and toot is from mentioned account, then get toot ID + ''' Here we extract the instructions for the butplug''' - # TODO: still need to write extraction code - buttplug_instructions = extract_buttplug_instructions(status) - click.echo('queueing instructions') - self.event_loop.run_until_complete(trigger_actuators(self.plug_client, buttplug_instructions)) - - + buttplug_instructions = extract_buttplug_instructions(status, self.regular_expression) + if buttplug_instructions: # check if buttplug_instructions is not empty + for buttplug_instruction in buttplug_instructions: + click.echo(f'queueing instructions {buttplug_instruction}') + self.event_loop.run_until_complete(trigger_actuators(self.plug_client, buttplug_instruction)) ''' if options['debug']: @@ -102,15 +123,6 @@ def stream(instance, users, client_id, client_secret, access_token, plug_client, users = [normalize_username(user, instance) for user in users] listener = StreamListener(plug_client, instance, users, event_loop) - - existing_statuses = client.timeline_hashtag(LISTEN_TO_HASHTAG, limit=1) - - if options['debug']: - print(rf'existing_statuses: {existing_statuses}') - - for status in existing_statuses: - listener.on_update(status) - click.echo(f'==> Streaming from {instance}') client.stream_user(listener) @@ -130,11 +142,22 @@ def normalize_username(user, instance): else: return user -def extract_buttplug_instructions(toot): +def extract_buttplug_instructions(status, regular_expression): '''Extract buttplug instruction informations from a toot.''' - doc_list = [] - doc = lh.fromstring(toot['content']) - doc = clean_html(doc) - doc_list.append(doc.text_content()) - print(rf'extracted buttplug_instruction: {doc_list}') - return doc_list \ No newline at end of file + toot = lh.fromstring(status['content']) + toot = clean_html(toot) + toot = toot.text_content() + instructions = regular_expression.findall(toot) + actuator_commands = [] # List of tuples with (duration in seconds, power in range 0..1) + for instruction in instructions: + commands = instruction.strip().split(" ") + print(commands) + if commands[-1][-1] != "%": + commands.append("100%") + commands = [int(command[:-1]) for command in commands] + power = commands.pop()/100 # convert power from % to range 0..1 + commands = list(zip(commands, cycle([power]))) + print(commands) + actuator_commands.extend(commands) + print(rf'extracted buttplug_instruction: {actuator_commands}') + return actuator_commands \ No newline at end of file