Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
# Documentation
*.md
docs/
*.txt
LICENSE
# Note: requirements.txt is NOT ignored (needed for build)

# Development files
.vscode/
Expand Down Expand Up @@ -124,8 +124,11 @@ node_modules/
# Optional npm cache directory
.npm

# Storage and data directories (only for build context)
storage/
# Storage data directories (NOT the storage module)
# Note: storage/ module is needed, storage/input, storage/output etc. are not
storage/input/
storage/output/
storage/temp/
data/
tmp/
temp/
Expand Down
37 changes: 37 additions & 0 deletions alembic/versions/005_add_batch_columns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Add batch_id and batch_index columns to jobs table

Revision ID: 005
Revises: 004
Create Date: 2025-01-20

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = '005'
down_revision: Union[str, None] = '004'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Add batch_id and batch_index columns to jobs table."""
# Add batch_id column for batch processing
op.add_column('jobs', sa.Column('batch_id', sa.String(), nullable=True))

# Add batch_index column for ordering within a batch
op.add_column('jobs', sa.Column('batch_index', sa.Integer(), nullable=True))

# Create index for batch_id for faster batch queries
op.create_index('ix_jobs_batch_id', 'jobs', ['batch_id'])


def downgrade() -> None:
"""Remove batch columns from jobs table."""
op.drop_index('ix_jobs_batch_id', 'jobs')
op.drop_column('jobs', 'batch_index')
op.drop_column('jobs', 'batch_id')
45 changes: 45 additions & 0 deletions annotated_doc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""
Annotated Doc compatibility module.

Provides the Doc annotation for FastAPI documentation.
Uses typing_extensions.Doc when available (Python 3.9+ with typing_extensions >= 4.9.0),
otherwise provides a simple fallback implementation.
"""

try:
from typing_extensions import Doc
except ImportError:
class Doc:
"""
Documentation annotation for Annotated types.

Used to provide documentation for type annotations in FastAPI endpoints.
Falls back to a simple implementation if typing_extensions is not available.

Example:
from typing import Annotated
from annotated_doc import Doc

def endpoint(
user_id: Annotated[str, Doc("The user's unique identifier")]
):
pass
"""
__slots__ = ('documentation',)

def __init__(self, documentation: str) -> None:
self.documentation = documentation

def __repr__(self) -> str:
return f"Doc({self.documentation!r})"

def __hash__(self) -> int:
return hash(self.documentation)

def __eq__(self, other: object) -> bool:
if isinstance(other, Doc):
return self.documentation == other.documentation
return NotImplemented


__all__ = ['Doc']
1 change: 1 addition & 0 deletions api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class Settings(BaseSettings):
TEMP_PATH: str = "/tmp/rendiff"

# Worker
WORKER_TYPE: str = "cpu" # cpu, gpu, or analysis
WORKER_CONCURRENCY: int = 4
WORKER_PREFETCH_MULTIPLIER: int = 1
WORKER_MAX_TASKS_PER_CHILD: int = 100
Expand Down
28 changes: 16 additions & 12 deletions api/routers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@
from api.config import settings
from api.dependencies import DatabaseSession, require_api_key
from api.models.job import Job, JobStatus, ErrorResponse
from api.services.queue import QueueService
from api.services.storage import StorageService
from pydantic import BaseModel

logger = structlog.get_logger()
router = APIRouter()

queue_service = QueueService()
storage_service = StorageService()
# Lazy import to avoid circular dependency
def get_queue_service():
from api.main import queue_service
return queue_service

def get_storage_service():
from api.main import storage_service
return storage_service


# Response models for OpenAPI documentation
Expand Down Expand Up @@ -124,7 +128,7 @@ async def get_workers_status(
Only accessible with admin API key.
"""
try:
workers = await queue_service.get_workers_status()
workers = await get_queue_service().get_workers_status()

return WorkersStatusResponse(
total_workers=len(workers),
Expand Down Expand Up @@ -169,7 +173,7 @@ async def get_storage_status(
try:
storage_status = {}

for name, backend in storage_service.backends.items():
for name, backend in get_storage_service().backends.items():
try:
# Get backend-specific status
backend_status = await backend.get_status()
Expand All @@ -186,8 +190,8 @@ async def get_storage_status(

return StorageStatusResponse(
backends=storage_status,
default_backend=storage_service.config.get("default_backend"),
policies=storage_service.config.get("policies", {}),
default_backend=get_storage_service().config.get("default_backend"),
policies=get_storage_service().config.get("policies", {}),
)
except Exception as e:
logger.error("Failed to get storage status", error=str(e))
Expand Down Expand Up @@ -267,8 +271,8 @@ async def get_system_stats(
"avg_processing_time": sum(row.avg_time or 0 for row in job_stats) / len(job_stats) if job_stats else 0,
"avg_vmaf_score": sum(row.avg_vmaf or 0 for row in job_stats if row.avg_vmaf) / sum(1 for row in job_stats if row.avg_vmaf) if any(row.avg_vmaf for row in job_stats) else None,
},
queue=await queue_service.get_queue_stats(),
workers=await queue_service.get_workers_stats(),
queue=await get_queue_service().get_queue_stats(),
workers=await get_queue_service().get_workers_stats(),
)

return stats
Expand Down Expand Up @@ -337,8 +341,8 @@ async def cleanup_old_jobs(
try:
# Delete output file if it exists
if job.output_path:
backend_name, file_path = storage_service.parse_uri(job.output_path)
backend = storage_service.backends.get(backend_name)
backend_name, file_path = get_storage_service().parse_uri(job.output_path)
backend = get_storage_service().backends.get(backend_name)
if backend:
await backend.delete(file_path)

Expand Down
18 changes: 11 additions & 7 deletions api/routers/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,21 @@
from api.config import settings
from api.dependencies import DatabaseSession, RequiredAPIKey
from api.models.job import Job, JobStatus, JobResponse, ErrorResponse
from api.services.queue import QueueService
from api.services.storage import StorageService
from api.utils.validators import validate_input_path, validate_output_path, validate_operations
from api.utils.media_validator import media_validator
from pydantic import BaseModel, Field

logger = structlog.get_logger()
router = APIRouter()

queue_service = QueueService()
storage_service = StorageService()
# Lazy import to avoid circular dependency
def get_queue_service():
from api.main import queue_service
return queue_service

def get_storage_service():
from api.main import storage_service
return storage_service


class BatchJob(BaseModel):
Expand Down Expand Up @@ -248,7 +252,7 @@ async def create_batch_job(
await db.refresh(job)

# Queue the job
await queue_service.enqueue_job(
await get_queue_service().enqueue_job(
job_id=str(job.id),
priority=job_request.priority,
)
Expand Down Expand Up @@ -487,9 +491,9 @@ async def cancel_batch(
try:
# Cancel job in queue
if job.status == JobStatus.QUEUED:
success = await queue_service.cancel_job(str(job.id))
success = await get_queue_service().cancel_job(str(job.id))
else: # PROCESSING
success = await queue_service.cancel_running_job(
success = await get_queue_service().cancel_running_job(
str(job.id),
job.worker_id or ""
)
Expand Down
17 changes: 12 additions & 5 deletions api/routers/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,21 @@
from api.config import settings
from api.dependencies import DatabaseSession, RequiredAPIKey
from api.models.job import Job, JobStatus, ConvertRequest, JobCreateResponse, JobResponse, ErrorResponse
from api.services.queue import QueueService
from api.services.storage import StorageService
from api.utils.validators import validate_input_path, validate_output_path, validate_operations

logger = structlog.get_logger()

router = APIRouter()

queue_service = QueueService()
storage_service = StorageService()
# Import services from main - they are initialized during app startup
# Lazy import to avoid circular dependency
def get_storage_service():
from api.main import storage_service
return storage_service

def get_queue_service():
from api.main import queue_service
return queue_service


@router.post(
Expand Down Expand Up @@ -104,6 +109,7 @@ async def convert_media(
output_path = request.output if isinstance(request.output, str) else request.output.get("path")

# Validate paths
storage_service = get_storage_service()
input_backend, input_validated = await validate_input_path(input_path, storage_service)
output_backend, output_validated = await validate_output_path(output_path, storage_service)

Expand Down Expand Up @@ -153,9 +159,10 @@ async def convert_media(

# Now we have a guaranteed unique job ID, queue it
job_id_str = str(job.id)

# Queue the job (do this before commit in case queuing fails)
try:
queue_service = get_queue_service()
await queue_service.enqueue_job(
job_id=job_id_str,
priority=request.priority,
Expand Down
18 changes: 11 additions & 7 deletions api/routers/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@

from api.config import settings
from api.dependencies import DatabaseSession
from api.services.queue import QueueService
from api.services.storage import StorageService

logger = structlog.get_logger()

router = APIRouter()

queue_service = QueueService()
storage_service = StorageService()
# Lazy import to avoid circular dependency
def get_queue_service():
from api.main import queue_service
return queue_service

def get_storage_service():
from api.main import storage_service
return storage_service


# Response models for OpenAPI documentation
Expand Down Expand Up @@ -146,7 +150,7 @@ async def detailed_health_check(

# Check queue
try:
queue_health = await queue_service.health_check()
queue_health = await get_queue_service().health_check()
health_status["components"]["queue"] = queue_health
except Exception as e:
health_status["status"] = "unhealthy"
Expand All @@ -157,7 +161,7 @@ async def detailed_health_check(

# Check storage backends
try:
storage_health = await storage_service.health_check()
storage_health = await get_storage_service().health_check()
health_status["components"]["storage"] = storage_health
except Exception as e:
health_status["status"] = "unhealthy"
Expand Down Expand Up @@ -283,7 +287,7 @@ async def get_capabilities() -> Dict[str, Any]:
"metrics": ["vmaf", "psnr", "ssim"],
"probing": ["format", "streams", "metadata"],
},
"storage_backends": list(storage_service.backends.keys()),
"storage_backends": list(get_storage_service().backends.keys()),
"hardware_acceleration": {
"available": await check_hardware_acceleration(),
"types": ["nvidia", "vaapi", "qsv", "videotoolbox"],
Expand Down
12 changes: 7 additions & 5 deletions api/routers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
from api.config import settings
from api.dependencies import DatabaseSession, RequiredAPIKey
from api.models.job import Job, JobStatus, JobResponse, JobListResponse, JobProgress, ErrorResponse
from api.services.queue import QueueService

logger = structlog.get_logger()

router = APIRouter()

queue_service = QueueService()
# Lazy import to avoid circular dependency
def get_queue_service():
from api.main import queue_service
return queue_service


@router.get(
Expand Down Expand Up @@ -288,10 +290,10 @@ async def cancel_job(

# Cancel in queue
if job.status == JobStatus.QUEUED:
await queue_service.cancel_job(str(job_id))
await get_queue_service().cancel_job(str(job_id))
elif job.status == JobStatus.PROCESSING:
# Send cancel signal to worker
await queue_service.cancel_running_job(str(job_id), job.worker_id)
await get_queue_service().cancel_running_job(str(job_id), job.worker_id)

# Update job status
job.status = JobStatus.CANCELLED
Expand Down Expand Up @@ -471,7 +473,7 @@ async def get_job_logs(

if job.status == JobStatus.PROCESSING and job.worker_id:
# Get live logs from worker
logs = await queue_service.get_worker_logs(job.worker_id, str(job_id), lines)
logs = await get_queue_service().get_worker_logs(job.worker_id, str(job_id), lines)
else:
# Get stored logs from database and log aggregation system
from api.services.job_service import JobService
Expand Down
Loading
Loading