133 lines
4.4 KiB
Python
133 lines
4.4 KiB
Python
# Standard Imports
|
|
import sys
|
|
import subprocess
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
# Local Imports
|
|
import config
|
|
from helpers.peertube_api import authenticate_to_peertube, upload_to_peertube
|
|
from helpers.owncast_api import get_owncast_title, send_owncast_message
|
|
|
|
|
|
def get_recent_segments(directory: Path, n: int):
|
|
"""
|
|
Returns a filtered list of the most recent .ts files in a directory.
|
|
|
|
Args:
|
|
directory (Path): The directory to search for .ts files.
|
|
n (int): The maximum number of files to return.
|
|
|
|
Returns:
|
|
list[str]: A list of absolute paths of the most recent n files.
|
|
"""
|
|
|
|
files = [f for f in directory.iterdir() if f.is_file()
|
|
and f.suffix.lower() == '.ts']
|
|
files.sort(key=lambda f: f.stat().st_mtime, reverse=False)
|
|
return [str(f.resolve()) for f in files[:n]]
|
|
|
|
|
|
def merge_segments_to_mp4(segments: list[str], output_path: Path, output_filename: str):
|
|
"""
|
|
Accepts a list of .ts files and attempts to concatenate them via ffmpeg.
|
|
|
|
Args:
|
|
segments (list[str]): A list of absolute paths to .ts filenames.
|
|
output_path (Path): An absolute path of the directory to create the new output file.
|
|
output_filename (str): The filename of the file to be created.
|
|
|
|
Returns:
|
|
Path: An asbsolute path to the created file, or None if the process failed.
|
|
"""
|
|
|
|
output_file = output_path / output_filename
|
|
if output_file.exists():
|
|
print(f'File already exists at: {output_file}. Abandoning.')
|
|
return None
|
|
|
|
# concat:/path/to/file1|/path/to/file2|...
|
|
segment_list = 'concat:' + '|'.join([f'{f}' for f in segments])
|
|
command = ['ffmpeg',
|
|
'-protocol_whitelist', 'concat,file',
|
|
'-i', segment_list,
|
|
'-c', 'copy',
|
|
output_file
|
|
]
|
|
|
|
process = subprocess.Popen(
|
|
command, stdin=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
stdout, stderr = process.communicate()
|
|
|
|
if process.returncode != 0:
|
|
print(f'ffmpeg ran into an issue creating {output_file}: {stderr}')
|
|
return None
|
|
return output_file
|
|
|
|
|
|
def delete_clip(filename: str):
|
|
"""
|
|
Deletes a named file from the temporary directory
|
|
|
|
Args:
|
|
filename (str): The name of the file to be deleted, e.g. file.mp4
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
|
|
file = config.TEMP_DIRECTORY / filename
|
|
try:
|
|
file.unlink()
|
|
except FileNotFoundError:
|
|
print(f'Error deleting temporary file: {file}; file not found.')
|
|
except Exception as e:
|
|
print(f'Error deleting temporary file: {file}; {e}.')
|
|
return
|
|
|
|
|
|
def generate_and_post_clip(user: str, title: str, timestamp: datetime):
|
|
"""
|
|
Invokes clip collection, remux, upload, and notification (chat) routines.
|
|
|
|
Args:
|
|
user (str): The name of the chat user that invoked the clip request.
|
|
title (str): The provided (or generated) title of the clip to be uploaded.
|
|
timestamp (datetime): The tz-aware timestamp indicating when clip was made.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
|
|
if not (recent_clips := get_recent_segments(config.CLIP_DIRECTORY, config.CLIP_QUANTITY)):
|
|
return
|
|
new_filename = f'{timestamp.strftime("%Y%m%d_%H%M%S_%f")}.mp4'
|
|
new_filepath = merge_segments_to_mp4(
|
|
recent_clips, config.TEMP_DIRECTORY, new_filename)
|
|
|
|
if (token := authenticate_to_peertube(config.PT_API_BASE, config.PEERTUBE_USER, config.PEERTUBE_PASS)) is None:
|
|
print(f'Failed to authenticate to PeerTube', file=sys.stderr)
|
|
delete_clip(new_filename)
|
|
return
|
|
|
|
stream_title = ''
|
|
if (stream_title := get_owncast_title()):
|
|
stream_title = f'during the "{stream_title}" stream'
|
|
|
|
upload_options = {}
|
|
upload_options[
|
|
'description'] = f'🎞️ Clipped by **{user}** {stream_title} on {timestamp.strftime("%Y-%m-%d %H:%M:%S")}'
|
|
upload_options['description'] += f'\n🌐 {config.OWNCAST_HOST}'
|
|
if (video_url := upload_to_peertube(config.PT_API_BASE, token, new_filepath, config.PEERTUBE_CHANNEL_ID, title, **upload_options)) is None:
|
|
print(f'Failed to upload file to PeerTube', file=sys.stderr)
|
|
delete_clip(new_filename)
|
|
return
|
|
delete_clip(new_filename)
|
|
|
|
clip_url = f'{config.PEERTUBE_HOST}{video_url}'
|
|
message = f'**{title}**: {clip_url}'
|
|
send_owncast_message(message)
|
|
|
|
print(f'Clip created by {user} at {timestamp}: {clip_url}')
|
|
return
|