fediplug/fediplay.py

148 lines
4.6 KiB
Python
Raw Normal View History

2018-01-25 04:13:34 +01:00
from os import environ, umask
2018-01-27 02:49:45 +01:00
import shlex
2017-12-26 22:34:39 +01:00
from subprocess import run
from threading import Thread, Lock
from lxml.etree import HTML
import mastodon
Mastodon = mastodon.Mastodon
from youtube_dl import YoutubeDL
class Queue(object):
def __init__(self):
self.lock = Lock()
self.playing = False
self.queue = []
def add(self, url):
filename = Getter().get(url)
with self.lock:
self.queue.append(filename)
if not self.playing:
self._play(self.queue.pop(0), self._play_finished)
def _play(self, filename, cb_complete):
self.playing = True
def run_thread(filename, cb_complete):
print('==> Playing', filename)
2018-01-27 02:49:45 +01:00
play_command = build_play_command(filename)
print('[executing]', play_command)
run(play_command, shell=True)
2018-01-25 02:49:30 +01:00
print('==> Playback complete')
2017-12-26 22:34:39 +01:00
cb_complete()
thread = Thread(target=run_thread, args=(filename, cb_complete))
thread.start()
def _play_finished(self):
with self.lock:
self.playing = False
if len(self.queue) > 0:
self._play(self.queue.pop(0), self._play_finished)
class Getter(object):
def _progress_hook(self, progress):
if progress['status'] == 'finished':
self.filename = progress['filename']
def get(self, url):
options = {
2018-01-25 04:13:34 +01:00
'format': 'mp3/mp4',
'nocheckcertificate': 'FEDIPLAY_NO_CHECK_CERTIFICATE' in environ,
'progress_hooks': [self._progress_hook]
}
2017-12-26 22:34:39 +01:00
with YoutubeDL(options) as downloader:
downloader.download([url])
return self.filename
class StreamListener(mastodon.StreamListener):
def __init__(self):
self.queue = Queue()
def on_update(self, status):
tags = extract_tags(status)
if 'fediplay' in tags:
links = extract_links(status)
self.queue.add(links[0])
def register(api_base_url):
2018-01-25 03:41:02 +01:00
old_umask = umask(0o77)
2017-12-26 22:34:39 +01:00
Mastodon.create_app('fediplay', api_base_url=api_base_url, to_file='clientcred.secret')
2018-01-25 03:41:02 +01:00
umask(old_umask)
2017-12-26 22:34:39 +01:00
def login(api_base_url, email, password):
client = Mastodon(client_id='clientcred.secret', api_base_url=api_base_url)
2018-01-25 03:41:02 +01:00
old_umask = umask(0o77)
2017-12-26 22:34:39 +01:00
client.log_in(email, password, to_file='usercred.secret')
2018-01-25 03:41:02 +01:00
umask(old_umask)
2017-12-26 22:34:39 +01:00
def login_with_code(api_base_url, grant_code):
client = Mastodon(client_id='clientcred.secret', api_base_url=api_base_url)
old_umask = umask(0o77)
client.log_in(code=grant_code, to_file='usercred.secret')
umask(old_umask)
2017-12-26 22:34:39 +01:00
def stream(api_base_url):
client = Mastodon(client_id='clientcred.secret', access_token='usercred.secret', api_base_url=api_base_url)
listener = StreamListener()
print('==> Streaming from', api_base_url)
client.stream_user(listener)
2017-12-26 22:34:39 +01:00
def extract_tags(toot):
return [tag['name'] for tag in toot['tags']]
2018-01-27 02:19:25 +01:00
def link_is_internal(link):
classes = link.attrib.get('class', '').split(' ')
2017-12-26 22:34:39 +01:00
if classes:
return 'mention' in classes
return False
def extract_links(toot):
html = HTML(toot['content'])
all_links = html.cssselect('a')
2018-01-27 02:19:25 +01:00
return [link.attrib['href'] for link in all_links if not link_is_internal(link)]
2017-12-26 22:34:39 +01:00
2018-01-27 02:49:45 +01:00
def build_play_command(filename):
escaped_filename = shlex.quote(filename)
template = environ.get(
'FEDIPLAY_PLAY_COMMAND',
'ffplay -v 0 -nostats -hide_banner -autoexit -nodisp {filename}'
)
return template.format(filename=escaped_filename)
2018-01-25 02:49:30 +01:00
def main():
from getpass import getpass
2018-01-25 04:13:34 +01:00
from os import path
from sys import exit, argv
2018-01-25 03:07:51 +01:00
api_base_url = environ.get('FEDIPLAY_API_BASE_URL')
if not api_base_url:
print('FEDIPLAY_API_BASE_URL environment variable not set')
exit(1)
2018-01-25 02:49:30 +01:00
2018-01-25 03:07:51 +01:00
if not path.exists('clientcred.secret'):
2018-01-25 02:49:30 +01:00
print('==> No clientcred.secret; registering application')
2018-01-25 03:07:51 +01:00
register(api_base_url)
2018-01-25 02:49:30 +01:00
2018-01-25 03:07:51 +01:00
if not path.exists('usercred.secret'):
2018-01-25 02:49:30 +01:00
print('==> No usercred.secret; logging in')
if '-b' in argv or '--browser' in argv:
client = Mastodon(client_id='clientcred.secret', api_base_url=api_base_url)
print("Open this page in your browser and follow the instructions to get the code you need, then paste it here")
print(client.auth_request_url())
grant_code = getpass('Code? ')
login_with_code(api_base_url, grant_code)
else:
email = input('Email: ')
password = getpass('Password: ')
login(api_base_url, email, password)
2018-01-25 02:49:30 +01:00
2018-01-25 03:07:51 +01:00
stream(api_base_url)
2018-01-25 02:49:30 +01:00
if __name__ == '__main__':
main()