Parse device instructions from toots (#15)

* Discard matching previous toots

We want to only take into account toot that are newly coming in while
the program is running. Previously it would also get the most recent matching
toot from before running the program.

Co-authored-by: Mæve Rey <42996147+m-rey@users.noreply.github.com>

* Add feature to execute device commands from toot

if a toot is written in a certain format containing instructions for the
buttplug.io device, they will be extracted and passed to the device.

Co-authored-by: Mæve Rey <42996147+m-rey@users.noreply.github.com>

---------

Co-authored-by: Mæve Rey <42996147+m-rey@users.noreply.github.com>
This commit is contained in:
Nova 2023-03-02 00:43:28 +01:00 committed by GitHub
parent 7a94771c70
commit 1b69fa5aba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 56 additions and 28 deletions

View file

@ -36,20 +36,22 @@ async def scan_devices(plug_client):
return plug_client
async def trigger_actuators(plug_client, play_command):
async def trigger_actuators(plug_client, actuator_command):
MAX_DURATION = 60 # maximum duration in seconds
MAX_POWER = 1 # has to be 0 <= n <= 1 or it will not work
duration = clamp(actuator_command[0], 0, MAX_DURATION)
power = clamp(actuator_command[1], 0, MAX_POWER)
# If we have any device we can access it by its ID:
device = plug_client.devices[0]
# The most common case among devices is that they have some actuators
# which accept a scalar value (0.0-1.0) as their command.
play_command = (1, 1)
if len(device.actuators) != 0:
print(len(device.actuators), "actuators found")
# cycle through all actuators in device
print(device.actuators)
print(f"{duration=} {power=}")
for actuator in device.actuators:
await actuator.command(play_command[0])
await actuator.command(power)
print("generic actuator")
await asyncio.sleep(play_command[1])
await asyncio.sleep(duration)
#stops all actuators
for actuator in device.actuators:
await actuator.command(0)
@ -73,6 +75,9 @@ async def disconnect_plug_client(plug_client):
# Disconnect the plug_client.
await plug_client.disconnect()
def clamp(n, smallest, largest):
'''returns the closest value to n still in range (clamp)'''
return max(smallest, min(n, largest))
# First things first. We set logging to the console and at INFO level.
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

View file

@ -9,6 +9,8 @@ import lxml.html as lh
from lxml.html.clean import clean_html
import mastodon
import asyncio
import re
from itertools import cycle
from fediplug.cli import options
import fediplug.keyring as keyring
@ -31,6 +33,23 @@ class StreamListener(mastodon.StreamListener):
self.instance = instance
self.users = users
self.event_loop = event_loop
self.regular_expression = re.compile(r"((?:\b(?:\d+s)(?:\s|\b))+(?:\d+%)?)+")
# extracts commands from captured toots for usage in buttplug.io actuator
# if no power % is given, a default will be set later
# examples:
# input: "#fediplug @nova_ have fun :) 10s 50% 4s 5s"
# output: ["10s 50%", "4s 5s"]
#
# input: "60% 10s @maeve 5s 7s 10% foo bar 8s baz 20% 30s 1337."
# output: ["10s 5s 7s 10%", "8s 20%", "30s"]
#
# input: "10s6 80%"
# output: []
#
# watch out for this quirk:
# input "10s 70%8"
# output: ["10s 70%"]
# TODO: fix this, it should match the 70% because there isnt a word boundary after it
if options['debug']:
print(rf'listener initialized with users={self.users}')
@ -40,6 +59,7 @@ class StreamListener(mastodon.StreamListener):
print(rf'incoming status: acct={status.account.acct}')
if self.users and normalize_username(status.account.acct, self.instance) not in self.users:
# TODO: only do this if no toot from self.users with #fediplug has been captured yet, else check in_reply_to_ID
if options['debug']:
print('skipping status due to username filtering')
return
@ -49,13 +69,14 @@ class StreamListener(mastodon.StreamListener):
print(rf'expecting: {LISTEN_TO_HASHTAG}, extracted tags: {tags}')
if LISTEN_TO_HASHTAG in tags:
# TODO: if Hashtag matches and toot is from mentioned account, then get toot ID
''' Here we extract the instructions for the butplug'''
# TODO: still need to write extraction code
buttplug_instructions = extract_buttplug_instructions(status)
click.echo('queueing instructions')
self.event_loop.run_until_complete(trigger_actuators(self.plug_client, buttplug_instructions))
buttplug_instructions = extract_buttplug_instructions(status, self.regular_expression)
if buttplug_instructions: # check if buttplug_instructions is not empty
for buttplug_instruction in buttplug_instructions:
click.echo(f'queueing instructions {buttplug_instruction}')
self.event_loop.run_until_complete(trigger_actuators(self.plug_client, buttplug_instruction))
'''
if options['debug']:
@ -102,15 +123,6 @@ def stream(instance, users, client_id, client_secret, access_token, plug_client,
users = [normalize_username(user, instance) for user in users]
listener = StreamListener(plug_client, instance, users, event_loop)
existing_statuses = client.timeline_hashtag(LISTEN_TO_HASHTAG, limit=1)
if options['debug']:
print(rf'existing_statuses: {existing_statuses}')
for status in existing_statuses:
listener.on_update(status)
click.echo(f'==> Streaming from {instance}')
client.stream_user(listener)
@ -130,11 +142,22 @@ def normalize_username(user, instance):
else:
return user
def extract_buttplug_instructions(toot):
def extract_buttplug_instructions(status, regular_expression):
'''Extract buttplug instruction informations from a toot.'''
doc_list = []
doc = lh.fromstring(toot['content'])
doc = clean_html(doc)
doc_list.append(doc.text_content())
print(rf'extracted buttplug_instruction: {doc_list}')
return doc_list
toot = lh.fromstring(status['content'])
toot = clean_html(toot)
toot = toot.text_content()
instructions = regular_expression.findall(toot)
actuator_commands = [] # List of tuples with (duration in seconds, power in range 0..1)
for instruction in instructions:
commands = instruction.strip().split(" ")
print(commands)
if commands[-1][-1] != "%":
commands.append("100%")
commands = [int(command[:-1]) for command in commands]
power = commands.pop()/100 # convert power from % to range 0..1
commands = list(zip(commands, cycle([power])))
print(commands)
actuator_commands.extend(commands)
print(rf'extracted buttplug_instruction: {actuator_commands}')
return actuator_commands