initial commit

This commit is contained in:
Glyphscribe Ety
2025-04-26 23:45:24 -05:00
commit f60db11733
10 changed files with 856 additions and 0 deletions

110
helpers/clips.py Normal file
View File

@ -0,0 +1,110 @@
# Standard Imports
import sys
import subprocess
from pathlib import Path
from datetime import datetime
# Local Imports
import config
from helpers.peertube_api import authenticate_to_peertube, upload_to_peertube
from helpers.owncast_api import get_owncast_title, send_owncast_message
def get_recent_segments(directory: Path, n: int):
"""
Returns a filtered list of the most recent .ts files in a directory.
Args:
directory (Path): The directory to search for .ts files.
n (int): The maximum number of files to return.
Returns:
list[str]: A list of absolute paths of the most recent n files.
"""
files = [f for f in directory.iterdir() if f.is_file()
and f.suffix.lower() == '.ts']
files.sort(key=lambda f: f.stat().st_mtime, reverse=False)
return [str(f.resolve()) for f in files[:n]]
def merge_segments_to_mp4(segments: list[str], output_path: Path, output_filename: str):
"""
Accepts a list of .ts files and attempts to concatenate them via ffmpeg.
Args:
segments (list[str]): A list of absolute paths to .ts filenames.
output_path (Path): An absolute path of the directory to create the new output file.
output_filename (str): The filename of the file to be created.
Returns:
Path: An asbsolute path to the created file, or None if the process failed.
"""
output_file = output_path / output_filename
if output_file.exists():
print(f'File already exists at: {output_file}. Abandoning.')
return None
# concat:/path/to/file1|/path/to/file2|...
segment_list = 'concat:' + '|'.join([f'{f}' for f in segments])
command = ['ffmpeg',
'-protocol_whitelist', 'concat,file',
'-i', segment_list,
'-c', 'copy',
output_file
]
process = subprocess.Popen(
command, stdin=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout, stderr = process.communicate()
if process.returncode != 0:
print(f'ffmpeg ran into an issue creating {output_file}: {stderr}')
return None
return output_file
def generate_and_post_clip(user: str, title: str, timestamp: datetime):
"""
Invokes clip collection, remux, upload, and notification (chat) routines.
Args:
user (str): The name of the chat user that invoked the clip request.
title (str): The provided (or generated) title of the clip to be uploaded.
timestamp (datetime): The tz-aware timestamp indicating when clip was made.
Returns:
None
"""
recent_clips = get_recent_segments(
config.CLIP_DIRECTORY, config.CLIP_QUANTITY)
new_filename = f'{timestamp.strftime("%Y%m%d_%H%M%S_%f")}.mp4'
new_filepath = merge_segments_to_mp4(
recent_clips, config.TEMP_DIRECTORY, new_filename)
if (token := authenticate_to_peertube(config.PT_API_BASE, config.PEERTUBE_USER, config.PEERTUBE_PASS)) is None:
print(f'Failed to authenticate to PeerTube', file=sys.stderr)
return
# TODO: handle this gracefully
stream_title = ''
if (stream_title := get_owncast_title()):
stream_title = f'during the "{stream_title}" stream'
upload_options = {}
upload_options[
'description'] = f'🎞️ Clipped by **{user}** {stream_title} on {timestamp.strftime("%Y-%m-%d %H:%M:%S")}'
upload_options['description'] += f'\n🌐 {config.OWNCAST_HOST}'
if (video_url := upload_to_peertube(config.PT_API_BASE, token, new_filepath, 3800, title, **upload_options)) is None:
print(f'Failed to upload file to PeerTube', file=sys.stderr)
return
# TODO: handle this gracefully
clip_url = f'{config.PEERTUBE_HOST}{video_url}'
message = f'**{title}**: {clip_url}'
send_owncast_message(message)
print(f'Clip created by {user} at {timestamp}: {clip_url}')
return

57
helpers/owncast_api.py Normal file
View File

@ -0,0 +1,57 @@
# Standard imports
import json
import sys
# Package Imports
import requests
# Local Imports
import config
def get_owncast_title():
"""
Gets the current owncast stream title, if any, via API call
Returns:
str: The current stream title, if one is set; otherwise None.
"""
try:
response = requests.get(f'{config.OWNCAST_HOST}/api/status')
if response.status_code != 200:
print(
f'Received a {response.status_code} response from Owncast while getting the title: {response}', file=sys.stderr)
return None
data = response.json()
except json.JSONDecodeError as e:
print(
f'Unable to parse response JSON from Owncast: {e}: {response}', file=sys.stderr)
return None
return data.get('streamTitle')
# TODO this should probably return whether or not it was successful.
def send_owncast_message(message: str):
"""
Sends a chat message to the owncast server via self-contained API call.
Args:
message (str): The message to be sent to owncast as the Clip Bot user
Returns:
None
"""
try:
url = f'{config.OWNCAST_HOST}/api/integrations/chat/send'
headers = {'Authorization': f'Bearer {config.OWNCAST_AUTH}'}
response = requests.post(
url=url, headers=headers, json={'body': message})
if response.status_code != 200:
print(
f'Received a {response.status_code} response from Owncast while trying to send a message: {response}', file=sys.stderr)
except requests.exceptions.RequestException as e:
print(
f'Error occurred while sending a chat message: {e}', file=sys.stderr)
return

132
helpers/peertube_api.py Normal file
View File

@ -0,0 +1,132 @@
# Standard Imports
import sys
import json
from pprint import pprint
# Package Imports
import requests
import peertube
# Local Imports
from peertube.rest import ApiException
from pathlib import Path
def peertube_request(endpoint: str, verb: str, **kwargs):
"""
A helper function that facilitates basic requests to PeerTube and decodes.
Args:
endpoint (str): A URL to act against.
verb (str): either "GET" or "POST".
**kwargs: Passed through to underlying request call (for e.g. POSTs).
Returns:
dict: The JSON-decoded response from the request, or None if there was an error.
"""
data = kwargs.get('data', None)
try:
if verb == 'GET':
response = requests.get(endpoint)
elif verb == 'POST':
response = requests.post(endpoint, data=data)
else:
return None
status_code = response.status_code
data = response.json()
if status_code != 200:
print(
f'Received a {status_code} response from PeerTube: {response}', file=sys.stderr)
return None
except json.JSONDecodeError as e:
print(
f'Unable to parse response JSON from PeerTube: {e}: {response}', file=sys.stderr)
return None
except requests.exceptions.RequestException as e:
print(
f'An error occurred while communicating with PeerTube: {e}', file=sys.stderr)
return None
else:
return data
def authenticate_to_peertube(api_base: str, username: str, password: str):
"""
Collects an access token from PeerTube via password authentication
Args:
api_base (str): The API base url for the given PeerTube instance, e.g. https://video.site/api/v1
username (str): The username of the PeerTube account to be used in authentication
password (str): The password of the PeerTube account to be used in authentication
Returns:
str: A string containing the access_token following authentication, or None if unable to auth.
"""
if (data := peertube_request(api_base + '/oauth-clients/local', 'GET')) is None:
print('Error initializing OAuth flow with PeerTube.', file=sys.stderr)
return None
client_id = data['client_id']
client_secret = data['client_secret']
data = {
'client_id': client_id,
'client_secret': client_secret,
'grant_type': 'password',
'response_type': 'code',
'username': username,
'password': password
}
if (data := peertube_request(api_base + '/users/token', 'POST', data=data)) is None:
print('Error authenticating with PeerTube via OAuth.', file=sys.stderr)
return None
return data['access_token']
def upload_to_peertube(api_base: str, api_token: str, file: Path, channel_id: int, title: str, **kwargs):
"""
Uploads a file from the local filesystem to PeerTube and returns a ready-to-consume URL to the video.
Args:
api_base (str): The API base url for the given PeerTube instance, e.g. https://video.site/api/v1
api_token (str): A valid OAuth access token obtained from previous authentication.
file (Path): An absolute path to the video file to be uploaded.
channel_id (int): The internal ID number of the PeerTube channel to be uploaded to.
title (str): The public-facing name of the video on PeerTube
**kwargs: Any other values to pass to the PeerTube Video Upload API: https://framagit.org/framasoft/peertube/clients/python/-/blob/master/docs/VideoApi.md#videos_upload_post
Returns:
str: A string containing the URL to the video on PeerTube, or None if unable to publish.
"""
configuration = peertube.Configuration(
host=api_base
)
configuration.access_token = api_token
with peertube.ApiClient(configuration) as api_client:
api_instance = peertube.VideoApi(api_client)
try:
options = {
**kwargs,
'privacy': 1,
'wait_transcoding': True
}
api_response = api_instance.videos_upload_post(
file, channel_id, title, **options)
except ApiException as e:
print(
f'Exception while uploading to PeerTube: {e}', file=sys.stderr)
return None
try:
video_id = api_response.video.uuid
api_response = api_instance.videos_id_get(video_id)
except ApiException as e:
print(
f'Exception while getting Clip URL from PeerTube: {e}', file=sys.stderr)
return None
except KeyError as e:
print(
f'Could not find Video ID in response from PeerTube: {e}', file=sys.stderr)
return None
return api_response.embed_path

98
helpers/websocket.py Normal file
View File

@ -0,0 +1,98 @@
# Standard Imports
import re
import sys
import html
import time
import json
from datetime import datetime
from zoneinfo import ZoneInfo
# Package Imports
import websocket
# Local Imports
from helpers.clips import generate_and_post_clip
def owncast_on_message(ws, message):
"""
Handles incoming messages from websocket, filters for commands, and dispatches.
Args:
ws: The websocket object calling this handler.
message: The message received by the websocket.
Returns:
None
"""
try:
data = json.loads(message)
if 'type' in data and data['type'] == 'CHAT':
# TODO: add anti-spam and authentication checks
current_time = datetime.now(ZoneInfo("America/Chicago"))
chat_message = html.unescape(
re.sub('<.*?>', '', data['body'])).strip()
if chat_message == '!clip' or chat_message.startswith('!clip '):
if len(chat_message) == 5:
clip_title = f'Clip by {data["user"]["displayName"]} on {current_time.strftime("%Y-%m-%d %H:%M:%S")}'
generate_and_post_clip(
data['user']['displayName'], clip_title, current_time)
elif len(chat_message) > 5 and len(chat_message) < 126:
generate_and_post_clip(
data['user']['displayName'], chat_message[6:], current_time)
else:
# TODO: report this error back to the chat
return
except json.JSONDecodeError as e:
print(f'Failed to decode incoming owncast websocket message.', file=sys.stderr)
def owncast_on_error(ws, error):
print(f'[WS-ERR] {error}', file=sys.stderr)
def owncast_on_close(ws, code, message):
print(f'Owncast websocket closed (code: {code}): {message}')
def owncast_on_open(ws):
print('Owncast websocket opened successfully.')
def owncast_connect(url: str, retries: int, initial_backoff: float):
"""
Attempts to connect to the Owncast websocket and auto-reconnect with backoff
Args:
url (str): The websocket service url to connect to.
retries (int): The maximum number of reconnect attemps before failing.
initial_backoff (float): The base amount of reconnection backoff time in seconds
Returns:
None
"""
attempt = 0
backoff = initial_backoff
while attempt < retries:
ws = websocket.WebSocketApp(url,
on_open=owncast_on_open,
on_message=owncast_on_message,
on_error=owncast_on_error,
on_close=owncast_on_close
)
try:
ws.run_forever()
except KeyboardInterrupt:
print('\nInterrupted by user. Exiting.')
sys.exit(0)
attempt += 1
if attempt < retries:
print(f'Reconnecting in {backoff:.1f}s ({attempt} of {retries})')
time.sleep(backoff)
backoff *= 2
print(f'Max reconnect attempts reached. Exiting.')