Add type hinting, format with black (#24)

* Add style indicatorsr

* add type hinting, format with black

Co-authored-by: Mæve Rey <42996147+m-rey@users.noreply.github.com>
This commit is contained in:
Nova 2023-03-13 22:07:13 +01:00 committed by GitHub
parent be11f68887
commit e14b1af877
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 210 additions and 132 deletions

View file

@ -1,3 +1,3 @@
'''A Mastodon client that automatically vibrates your buttplug.io devices as people on your timeline toot instructions.'''
"""A Mastodon client that automatically vibrates your buttplug.io devices as people on your timeline toot instructions."""
from fediplug.cli import cli

View file

@ -1,5 +1,6 @@
'''Hook for running fediplug module as a script.'''
"""Hook for running fediplug module as a script."""
from fediplug.cli import cli
cli()
if __name__ == "__main__":
cli()

View file

@ -1,4 +1,4 @@
'''Buttplug controller'''
"""Buttplug controller"""
import asyncio
import logging
@ -8,8 +8,11 @@ from buttplug import Client, WebsocketConnector, ProtocolSpec
from fediplug.cli import options
async def connect_plug_client():
'''create Client object and connect plug client to Intiface Central or similar'''
from typing import Tuple
async def connect_plug_client() -> Client:
"""create Client object and connect plug client to Intiface Central or similar"""
plug_client = Client("fediplug", ProtocolSpec.v3)
connector = WebsocketConnector("ws://127.0.0.1:12345", logger=plug_client.logger)
@ -18,25 +21,28 @@ async def connect_plug_client():
except Exception as e:
logging.error(f"Could not connect to server, exiting: {e}")
return
print('plug client connected')
print("plug client connected")
return plug_client
async def scan_devices(plug_client):
async def scan_devices(plug_client: Client) -> Client:
# scan for devices for 5 seconds
await plug_client.start_scanning()
await asyncio.sleep(5)
await plug_client.stop_scanning()
plug_client.logger.info(f"Devices: {plug_client.devices}")
# If we have any device we can print the list of devices
# and access it by its ID: ( this step is done is trigger_actuators() )
# If we have any device we can print the list of devices
# and access it by its ID: ( this step is done is trigger_actuators() )
if len(plug_client.devices) != 0:
print(plug_client.devices)
print(len(plug_client.devices), "devices found")
return plug_client
async def trigger_actuators(plug_client, actuator_command):
async def trigger_actuators(
plug_client: Client, actuator_command: Tuple[float, float]
) -> None:
MAX_DURATION = 60 # maximum duration in seconds
MAX_POWER = 1 # has to be 0 <= n <= 1 or it will not work
duration = clamp(actuator_command[0], 0, MAX_DURATION)
@ -52,10 +58,12 @@ async def trigger_actuators(plug_client, actuator_command):
await actuator.command(power)
print("generic actuator")
await asyncio.sleep(duration)
#stops all actuators
# stops all actuators
for actuator in device.actuators:
await actuator.command(0)
'''
"""
# Some devices may have linear actuators which need a different command.
# The first parameter is the time duration in ms and the second the
# position for the linear axis (0.0-1.0).
@ -69,15 +77,18 @@ async def trigger_actuators(plug_client, actuator_command):
if len(device.rotatory_actuators) != 0:
await device.rotatory_actuators[0].command(0.5, True)
print("rotary actuator")
'''
"""
async def disconnect_plug_client(plug_client):
async def disconnect_plug_client(plug_client: Client) -> None:
# Disconnect the plug_client.
await plug_client.disconnect()
def clamp(n, smallest, largest):
'''returns the closest value to n still in range (clamp)'''
def clamp(n: float, smallest: float, largest: float) -> float:
"""returns the closest value to n still in range (clamp)"""
return max(smallest, min(n, largest))
# First things first. We set logging to the console and at INFO level.
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

View file

@ -1,8 +1,13 @@
'''Entry point for command-line interface.'''
"""Entry point for command-line interface."""
options = {'debug': False}
from typing import Dict, Tuple
options: Dict[str, bool] = {
"debug": False
} # needs to be initialized before imports, to avoid circular import error (yes, i know...)
import os
path = os.path
import sys
@ -14,67 +19,79 @@ import fediplug.mastodon as mastodon
import fediplug.keyring as keyring
import fediplug.buttplugio as buttplugio
def get_access_token(instance):
'''Ensure the user credential exists.'''
def get_access_token(instance: str) -> str:
"""Ensure the user credential exists."""
keyring.migrate_access_token(instance)
if not keyring.has_credential(instance, keyring.CREDENTIAL_ACCESS_TOKEN):
click.echo(f'user credential for {instance} does not exist; try `fediplug login`')
click.echo(
f"user credential for {instance} does not exist; try `fediplug login`"
)
sys.exit(1)
return keyring.get_credential(instance, keyring.CREDENTIAL_ACCESS_TOKEN)
def get_client_credentials(instance):
'''Ensure the client credentials exist.'''
def get_client_credentials(instance: str) -> Tuple[str, str]:
"""Ensure the client credentials exist."""
keyring.migrate_client_credentials(instance)
if not (keyring.has_credential(instance, keyring.CREDENTIAL_CLIENT_ID) and
keyring.has_credential(instance, keyring.CREDENTIAL_CLIENT_SECRET)):
click.echo(f'client credentials for {instance} do not exist; try `fediplug register`')
if not (
keyring.has_credential(instance, keyring.CREDENTIAL_CLIENT_ID)
and keyring.has_credential(instance, keyring.CREDENTIAL_CLIENT_SECRET)
):
click.echo(
f"client credentials for {instance} do not exist; try `fediplug register`"
)
sys.exit(1)
return (
keyring.get_credential(instance, keyring.CREDENTIAL_CLIENT_ID),
keyring.get_credential(instance, keyring.CREDENTIAL_CLIENT_SECRET)
keyring.get_credential(instance, keyring.CREDENTIAL_CLIENT_SECRET),
)
@click.group()
@click.option('-d', '--debug', is_flag=True, help='Print debug messages.')
def cli(debug):
'''A program to play music your friends post on Mastodon.'''
options['debug'] = debug
@click.group()
@click.option("-d", "--debug", is_flag=True, help="Print debug messages.")
def cli(debug: bool) -> None:
"""A program to play music your friends post on Mastodon."""
options["debug"] = debug
@cli.command()
@click.argument('instance')
def register(instance):
'''Register fediplug on your Mastodon instance.'''
@click.argument("instance")
def register(instance: str) -> None:
"""Register fediplug on your Mastodon instance."""
mastodon.register(instance)
@cli.command()
@click.argument('instance')
def login(instance):
'''Log in to your Mastodon instance.'''
@click.argument("instance")
def login(instance: str) -> None:
"""Log in to your Mastodon instance."""
client_id, client_secret = get_client_credentials(instance)
click.echo('Open this page in your browser and follow the instructions.')
click.echo('Paste the code here.')
click.echo('')
click.echo("Open this page in your browser and follow the instructions.")
click.echo("Paste the code here.")
click.echo("")
click.echo(mastodon.get_auth_request_url(instance, client_id, client_secret))
click.echo('')
click.echo("")
grant_code = input('Code: ')
grant_code = input("Code: ")
mastodon.login(instance, client_id, client_secret, grant_code)
@cli.command()
@click.argument('instance')
@click.argument('users', nargs=-1)
def stream(instance, users):
'''Control buttplug.io device from your timeline.'''
@click.argument("instance")
@click.argument("users", nargs=-1)
def stream(instance: str, users: Tuple[str]):
"""Control buttplug.io device from your timeline."""
event_loop = asyncio.get_event_loop()
plug_client = event_loop.run_until_complete(buttplugio.connect_plug_client())
@ -83,4 +100,6 @@ def stream(instance, users):
client_id, client_secret = get_client_credentials(instance)
access_token = get_access_token(instance)
mastodon.stream(instance, users, client_id, client_secret, access_token, plug_client, event_loop)
mastodon.stream(
instance, users, client_id, client_secret, access_token, plug_client, event_loop
)

View file

@ -1,6 +1,6 @@
'''Application directories.'''
"""Application directories."""
from appdirs import AppDirs
DIRS = AppDirs('fediplug', appauthor=False)
DIRS = AppDirs("fediplug", appauthor=False)

View file

@ -1,6 +1,8 @@
'''Secret storage.'''
"""Secret storage."""
import os
from typing import Optional
path = os.path
import click
@ -9,29 +11,34 @@ from keyring import get_password, set_password
from fediplug.dirs import DIRS
SERVICE_NAME = 'fediplug'
CREDENTIAL_CLIENT_ID = 'client_id'
CREDENTIAL_CLIENT_SECRET = 'client_secret'
CREDENTIAL_ACCESS_TOKEN = 'access_token'
SERVICE_NAME: str = "fediplug"
CREDENTIAL_CLIENT_ID: str = "client_id"
CREDENTIAL_CLIENT_SECRET: str = "client_secret"
CREDENTIAL_ACCESS_TOKEN: str = "access_token"
def build_username(instance, credential_kind):
return credential_kind + '@' + instance
def set_credential(instance, credential_kind, credential):
def build_username(instance: str, credential_kind: str) -> str:
return credential_kind + "@" + instance
def set_credential(instance: str, credential_kind: str, credential: str) -> None:
set_password(SERVICE_NAME, build_username(instance, credential_kind), credential)
def get_credential(instance, credential_kind):
def get_credential(instance: str, credential_kind: str) -> Optional[str]:
return get_password(SERVICE_NAME, build_username(instance, credential_kind))
def has_credential(instance, credential_kind):
def has_credential(instance: str, credential_kind: str) -> bool:
return get_credential(instance, credential_kind) is not None
def migrate_client_credentials(instance):
def migrate_and_unlink(filename):
if path.exists(filename):
click.echo('==> Migrating client credentials to keyring from ' + filename)
with open(filename, 'r', encoding='utf-8') as infile:
def migrate_client_credentials(instance: str) -> None:
def migrate_and_unlink(filename: str) -> None:
if path.exists(filename):
click.echo("==> Migrating client credentials to keyring from " + filename)
with open(filename, "r", encoding="utf-8") as infile:
client_id = infile.readline().strip()
client_secret = infile.readline().strip()
@ -40,20 +47,21 @@ def migrate_client_credentials(instance):
os.unlink(filename)
migrate_and_unlink('clientcred.secret')
migrate_and_unlink(path.join(DIRS.user_config_dir, instance + '.clientcred.secret'))
migrate_and_unlink("clientcred.secret")
migrate_and_unlink(path.join(DIRS.user_config_dir, instance + ".clientcred.secret"))
def migrate_access_token(instance):
def migrate_and_unlink(filename):
def migrate_access_token(instance: str) -> None:
def migrate_and_unlink(filename: str) -> None:
if path.exists(filename):
click.echo('==> Migrating access token to keyring from ' + filename)
click.echo("==> Migrating access token to keyring from " + filename)
with open(filename, 'r', encoding='utf-8') as infile:
access_token = infile.readline().strip()
with open(filename, "r", encoding="utf-8") as infile:
access_token: str = infile.readline().strip()
set_credential(instance, CREDENTIAL_ACCESS_TOKEN, access_token)
os.unlink(filename)
migrate_and_unlink('usercred.secret')
migrate_and_unlink(path.join(DIRS.user_config_dir, instance + '.usercred.secret'))
migrate_and_unlink("usercred.secret")
migrate_and_unlink(path.join(DIRS.user_config_dir, instance + ".usercred.secret"))

View file

@ -1,6 +1,5 @@
'''Mastodon interface.'''
"""Mastodon interface."""
LISTEN_TO_HASHTAG = 'fediplug'
import click
import lxml.html as lh
@ -13,18 +12,26 @@ from fediplug.cli import options
import fediplug.keyring as keyring
from fediplug.buttplugio import trigger_actuators
LISTEN_TO_HASHTAG = "fediplug"
Mastodon = mastodon.Mastodon
def api_base_url(instance):
'''Create an API base url from an instance name.'''
def api_base_url(instance: str) -> str:
"""Create an API base url from an instance name."""
return "https://" + instance
return 'https://' + instance
class StreamListener(mastodon.StreamListener):
'''Listens to a Mastodon timeline and adds buttplug instructions the given queue.'''
"""Listens to a Mastodon timeline and adds buttplug instructions the given queue."""
def __init__(self, plug_client, instance, users, event_loop):
def __init__(
self,
plug_client,
instance: str,
users: list[str],
event_loop: object,
) -> None:
self.plug_client = plug_client
self.instance = instance
self.users = users
@ -42,105 +49,132 @@ class StreamListener(mastodon.StreamListener):
# input: "10s6 80%"
# output: []
#
# watch out for this quirk:
# watch out for this quirk:
# input "10s 70%8"
# output: ["10s 70%"]
# TODO: fix this, it should match the 70% because there isnt a word boundary after it
if options['debug']:
print(rf'listener initialized with users={self.users}')
if options["debug"]:
print(rf"listener initialized with users={self.users}")
def on_update(self, status):
if options['debug']:
print(rf'incoming status: acct={status.account.acct}')
def on_update(self, status: dict) -> None:
if options["debug"]:
print(rf"incoming status: acct={status.account.acct}")
if self.users and normalize_username(status.account.acct, self.instance) not in self.users:
if (
self.users
and normalize_username(status.account.acct, self.instance) not in self.users
):
# TODO: only do this if no toot from self.users with #fediplug has been captured yet, else check in_reply_to_ID
if options['debug']:
print('skipping status due to username filtering')
if options["debug"]:
print("skipping status due to username filtering")
return
tags = extract_tags(status)
if options['debug']:
print(rf'expecting: {LISTEN_TO_HASHTAG}, extracted tags: {tags}')
if options["debug"]:
print(rf"expecting: {LISTEN_TO_HASHTAG}, extracted tags: {tags}")
if LISTEN_TO_HASHTAG in tags:
# TODO: if Hashtag matches and toot is from mentioned account, then get toot ID
''' Here we extract the instructions for the butplug'''
buttplug_instructions = extract_buttplug_instructions(status, self.regular_expression)
"""Here we extract the instructions for the butplug"""
buttplug_instructions = extract_buttplug_instructions(
status, self.regular_expression
)
if buttplug_instructions: # check if buttplug_instructions is not empty
for buttplug_instruction in buttplug_instructions:
click.echo(f'queueing instructions {buttplug_instruction}')
self.event_loop.run_until_complete(trigger_actuators(self.plug_client, buttplug_instruction))
click.echo(f"queueing instructions {buttplug_instruction}")
self.event_loop.run_until_complete(
trigger_actuators(self.plug_client, buttplug_instruction)
)
def register(instance):
'''Register fediplug to a Mastodon server and save the client credentials.'''
"""Register fediplug to a Mastodon server and save the client credentials."""
client_id, client_secret = Mastodon.create_app('fediplug', scopes=['read'], api_base_url=api_base_url(instance))
client_id, client_secret = Mastodon.create_app(
"fediplug", scopes=["read"], api_base_url=api_base_url(instance)
)
keyring.set_credential(instance, keyring.CREDENTIAL_CLIENT_ID, client_id)
keyring.set_credential(instance, keyring.CREDENTIAL_CLIENT_SECRET, client_secret)
def build_client(instance, client_id, client_secret, access_token=None):
'''Builds a Mastodon client.'''
return Mastodon(api_base_url=api_base_url(instance),
client_id=client_id, client_secret=client_secret, access_token=access_token)
def build_client(instance, client_id, client_secret, access_token=None):
"""Builds a Mastodon client."""
return Mastodon(
api_base_url=api_base_url(instance),
client_id=client_id,
client_secret=client_secret,
access_token=access_token,
)
def get_auth_request_url(instance, client_id, client_secret):
'''Gets an authorization request URL from a Mastodon instance.'''
"""Gets an authorization request URL from a Mastodon instance."""
return build_client(instance, client_id, client_secret).auth_request_url(
scopes=["read"]
)
return build_client(instance, client_id, client_secret).auth_request_url(scopes=['read'])
def login(instance, client_id, client_secret, grant_code):
'''Log in to a Mastodon server and save the user credentials.'''
"""Log in to a Mastodon server and save the user credentials."""
client = build_client(instance, client_id, client_secret)
access_token = client.log_in(code=grant_code, scopes=['read'])
access_token = client.log_in(code=grant_code, scopes=["read"])
keyring.set_credential(instance, keyring.CREDENTIAL_ACCESS_TOKEN, access_token)
def stream(instance, users, client_id, client_secret, access_token, plug_client, event_loop):
'''Stream statuses and add them to a queue.'''
def stream(
instance, users, client_id, client_secret, access_token, plug_client, event_loop
):
"""Stream statuses and add them to a queue."""
client = build_client(instance, client_id, client_secret, access_token)
users = [normalize_username(user, instance) for user in users]
listener = StreamListener(plug_client, instance, users, event_loop)
click.echo(f'==> Streaming from {instance}')
click.echo(f"==> Streaming from {instance}")
client.stream_user(listener)
def extract_tags(toot):
'''Extract tags from a toot.'''
return [tag['name'] for tag in toot['tags']]
def extract_tags(toot):
"""Extract tags from a toot."""
return [tag["name"] for tag in toot["tags"]]
def normalize_username(user, instance):
user = user.lstrip('@')
parts = user.split('@')
if options['debug']:
print(rf'parts: {parts}')
user = user.lstrip("@")
parts = user.split("@")
if options["debug"]:
print(rf"parts: {parts}")
if len(parts) == 1 or parts[1] == instance:
return parts[0]
else:
return user
def extract_buttplug_instructions(status, regular_expression):
'''Extract buttplug instruction informations from a toot.'''
toot = lh.fromstring(status['content'])
"""Extract buttplug instruction informations from a toot."""
toot = lh.fromstring(status["content"])
toot = clean_html(toot)
toot = toot.text_content()
instructions = regular_expression.findall(toot)
actuator_commands = [] # List of tuples with (duration in seconds, power in range 0..1)
actuator_commands = (
[]
) # List of tuples with (duration in seconds, power in range 0..1)
for instruction in instructions:
commands = instruction.strip().split(" ")
print(commands)
if commands[-1][-1] != "%":
commands.append("100%")
commands = [int(command[:-1]) for command in commands]
power = commands.pop()/100 # convert power from % to range 0..1
power = commands.pop() / 100 # convert power from % to range 0..1
commands = list(zip(commands, cycle([power])))
print(commands)
actuator_commands.extend(commands)
print(rf'extracted buttplug_instruction: {actuator_commands}')
return actuator_commands
print(rf"extracted buttplug_instruction: {actuator_commands}")
return actuator_commands

View file

@ -6,17 +6,22 @@ from fediplug.queue import build_play_command
def test_extract_links():
toot = {
'content': "<p><a href=\"https://cybre.space/tags/nowplaying\" class=\"mention hashtag\" rel=\"tag\">#<span>nowplaying</span></a> <a href=\"https://cybre.space/tags/fediplug\" class=\"mention hashtag\" rel=\"tag\">#<span>fediplug</span></a> Grimes ft. Janelle Mon\u00e1e - Venus Fly <a href=\"https://www.youtube.com/watch?v=eTLTXDHrgtw\" rel=\"nofollow noopener\" target=\"_blank\"><span class=\"invisible\">https://www.</span><span class=\"ellipsis\">youtube.com/watch?v=eTLTXDHrgt</span><span class=\"invisible\">w</span></a></p>"
"content": '<p><a href="https://cybre.space/tags/nowplaying" class="mention hashtag" rel="tag">#<span>nowplaying</span></a> <a href="https://cybre.space/tags/fediplug" class="mention hashtag" rel="tag">#<span>fediplug</span></a> Grimes ft. Janelle Mon\u00e1e - Venus Fly <a href="https://www.youtube.com/watch?v=eTLTXDHrgtw" rel="nofollow noopener" target="_blank"><span class="invisible">https://www.</span><span class="ellipsis">youtube.com/watch?v=eTLTXDHrgt</span><span class="invisible">w</span></a></p>'
}
urls = extract_links(toot)
assert urls == ['https://www.youtube.com/watch?v=eTLTXDHrgtw']
assert urls == ["https://www.youtube.com/watch?v=eTLTXDHrgtw"]
def test_build_play_command_default():
environ.pop('FEDIPLAY_PLAY_COMMAND')
play_command = build_play_command('Awesome Music.mp3')
assert play_command == 'ffplay -v 0 -nostats -hide_banner -autoexit -nodisp \'Awesome Music.mp3\''
environ.pop("FEDIPLAY_PLAY_COMMAND")
play_command = build_play_command("Awesome Music.mp3")
assert (
play_command
== "ffplay -v 0 -nostats -hide_banner -autoexit -nodisp 'Awesome Music.mp3'"
)
def test_build_play_command_specified():
environ.update(FEDIPLAY_PLAY_COMMAND='afplay {filename}')
play_command = build_play_command('Awesome Music.mp3')
assert play_command == 'afplay \'Awesome Music.mp3\''
environ.update(FEDIPLAY_PLAY_COMMAND="afplay {filename}")
play_command = build_play_command("Awesome Music.mp3")
assert play_command == "afplay 'Awesome Music.mp3'"