From 26de181876a0ba2574ef36726738ccc77dbc2204 Mon Sep 17 00:00:00 2001 From: aloke majumder Date: Mon, 4 Aug 2025 15:44:53 +0530 Subject: [PATCH] Fixed Issues --- README.md | 19 +- api/main.py | 3 +- api/models/job.py | 4 + api/routers/batch.py | 397 +++++++++++++++++++++++++++++ api/utils/media_validator.py | 366 ++++++++++++++++++++++++++ docs/API.md | 147 ++++++++++- worker/processors/video.py | 482 ++++++++++++++++++++++++++++++++++- worker/utils/ffmpeg.py | 77 +++++- 8 files changed, 1484 insertions(+), 11 deletions(-) create mode 100644 api/routers/batch.py create mode 100644 api/utils/media_validator.py diff --git a/README.md b/README.md index b2e13b3..1bd31a0 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,7 @@ POST /api/v1/convert # Universal media conversion POST /api/v1/analyze # Quality metrics (VMAF, PSNR, SSIM) POST /api/v1/stream # HLS/DASH adaptive streaming POST /api/v1/estimate # Processing time/cost estimation +POST /api/v1/batch # Batch processing (up to 100 jobs) ``` ### Job Management @@ -83,6 +84,8 @@ GET /api/v1/jobs # List and filter jobs GET /api/v1/jobs/{id} # Job status and progress GET /api/v1/jobs/{id}/events # Real-time progress (SSE) DELETE /api/v1/jobs/{id} # Cancel job +GET /api/v1/batch/{id} # Batch job status and progress +DELETE /api/v1/batch/{id} # Cancel entire batch ``` ### System & Health @@ -107,15 +110,27 @@ GET /docs # Interactive API documentation - **VMAF** - Perceptual video quality measurement - **PSNR** - Peak Signal-to-Noise Ratio - **SSIM** - Structural Similarity Index + +> **📊 Need detailed media analysis?** Check out our companion [FFprobe API](https://github.com/rendiffdev/ffprobe-api) for comprehensive media file inspection, metadata extraction, and format analysis. - **Bitrate Analysis** - Compression efficiency metrics ### Enterprise Security - **API Key Authentication** with role-based permissions -- **Rate Limiting** with configurable thresholds -- **Input Validation** prevents command injection +- **Advanced Rate Limiting** with Redis-backed distributed limiting +- **Input Validation** prevents command injection and malicious uploads +- **Media File Security** with comprehensive malware detection - **HTTPS/SSL** with automatic certificate management - **Security Headers** (HSTS, CSP, XSS protection) +- **Security Audit Logging** tracks suspicious activity + +### Advanced Features + +- **Adaptive Streaming** - HLS/DASH with multiple quality variants +- **Batch Processing** - Process up to 100 files simultaneously +- **Enhanced Thumbnails** - Multiple formats, grids, and quality options +- **Professional Watermarking** - Advanced positioning and opacity controls +- **Quality Analysis** - VMAF, PSNR, SSIM with reference comparison ### Production Monitoring diff --git a/api/main.py b/api/main.py index 5b61e6c..c704818 100644 --- a/api/main.py +++ b/api/main.py @@ -15,7 +15,7 @@ from api.config import settings from api.middleware.security import SecurityHeadersMiddleware, RateLimitMiddleware from api.models.database import init_db -from api.routers import admin, api_keys, convert, health, jobs +from api.routers import admin, api_keys, batch, convert, health, jobs from api.services.queue import QueueService from api.services.storage import StorageService from api.utils.error_handlers import ( @@ -149,6 +149,7 @@ def _configure_routes(application: FastAPI) -> None: application.include_router(health.router, prefix="/api/v1", tags=["health"]) application.include_router(convert.router, prefix="/api/v1", tags=["processing"]) application.include_router(jobs.router, prefix="/api/v1", tags=["jobs"]) + application.include_router(batch.router, prefix="/api/v1", tags=["batch"]) # Management routes application.include_router(api_keys.router, prefix="/api/v1", tags=["authentication"]) diff --git a/api/models/job.py b/api/models/job.py index 18f0487..2e036b8 100644 --- a/api/models/job.py +++ b/api/models/job.py @@ -107,6 +107,10 @@ class Job(Base): webhook_url = Column(String, nullable=True) webhook_events = Column(JSON, default=["complete", "error"]) + # Batch processing + batch_id = Column(String, nullable=True, index=True) + batch_index = Column(Integer, nullable=True) + # Indexes __table_args__ = ( Index("idx_job_status_created", "status", "created_at"), diff --git a/api/routers/batch.py b/api/routers/batch.py new file mode 100644 index 0000000..4fc9652 --- /dev/null +++ b/api/routers/batch.py @@ -0,0 +1,397 @@ +""" +Batch processing endpoint for multiple media files +""" +from typing import Dict, Any, List +from uuid import uuid4 + +from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks +from sqlalchemy.ext.asyncio import AsyncSession +import structlog + +from api.config import settings +from api.dependencies import get_db, require_api_key +from api.models.job import Job, JobStatus, JobCreateResponse, JobResponse +from api.services.queue import QueueService +from api.services.storage import StorageService +from api.utils.validators import validate_input_path, validate_output_path, validate_operations +from api.utils.media_validator import media_validator +from pydantic import BaseModel + +logger = structlog.get_logger() +router = APIRouter() + +queue_service = QueueService() +storage_service = StorageService() + + +class BatchJob(BaseModel): + """Single job in a batch.""" + input: str + output: str + operations: List[Dict[str, Any]] = [] + options: Dict[str, Any] = {} + priority: str = "normal" + + +class BatchProcessRequest(BaseModel): + """Batch processing request model.""" + jobs: List[BatchJob] + batch_name: str = "" + webhook_url: str = None + webhook_events: List[str] = [] + validate_files: bool = True + + +class BatchProcessResponse(BaseModel): + """Batch processing response model.""" + batch_id: str + total_jobs: int + jobs: List[JobResponse] + estimated_cost: Dict[str, Any] + warnings: List[str] + + +@router.post("/batch", response_model=BatchProcessResponse) +async def create_batch_job( + request: BatchProcessRequest, + background_tasks: BackgroundTasks, + db: AsyncSession = Depends(get_db), + api_key: str = Depends(require_api_key), +) -> BatchProcessResponse: + """ + Create a batch of media processing jobs. + + This endpoint allows submitting multiple jobs at once for efficient processing. + Jobs in a batch can have different operations and priorities. + """ + try: + if not request.jobs: + raise HTTPException(status_code=400, detail="No jobs provided in batch") + + if len(request.jobs) > 100: # Reasonable batch limit + raise HTTPException(status_code=400, detail="Batch size exceeds maximum of 100 jobs") + + batch_id = str(uuid4()) + created_jobs = [] + warnings = [] + total_estimated_time = 0 + + logger.info( + "Starting batch job creation", + batch_id=batch_id, + total_jobs=len(request.jobs), + api_key=api_key[:8] + "..." if len(api_key) > 8 else api_key + ) + + # Validate all files first if requested + if request.validate_files: + file_paths = [job.input for job in request.jobs] + + # Get API key tier for validation limits + api_key_tier = _get_api_key_tier(api_key) + + validation_results = await media_validator.validate_batch_files( + file_paths, api_key_tier + ) + + if validation_results['invalid_files'] > 0: + invalid_files = [ + r for r in validation_results['results'] + if r['status'] == 'invalid' + ] + warnings.append(f"Found {len(invalid_files)} invalid files in batch") + + # Optionally fail the entire batch if any files are invalid + if len(invalid_files) == len(request.jobs): + raise HTTPException( + status_code=400, + detail="All files in batch failed validation" + ) + + # Create individual jobs + for i, job_request in enumerate(request.jobs): + try: + # Validate paths + input_backend, input_validated = await validate_input_path( + job_request.input, storage_service + ) + output_backend, output_validated = await validate_output_path( + job_request.output, storage_service + ) + + # Validate operations + operations_validated = validate_operations(job_request.operations) + + # Create job record + job = Job( + id=uuid4(), + status=JobStatus.QUEUED, + priority=job_request.priority, + input_path=input_validated, + output_path=output_validated, + options=job_request.options, + operations=operations_validated, + api_key=api_key, + webhook_url=request.webhook_url, + webhook_events=request.webhook_events, + batch_id=batch_id, # Link to batch + batch_index=i, # Position in batch + ) + + # Add to database + db.add(job) + await db.commit() + await db.refresh(job) + + # Queue the job + await queue_service.enqueue_job( + job_id=str(job.id), + priority=job_request.priority, + ) + + # Create job response + job_response = JobResponse( + id=job.id, + status=job.status, + priority=job.priority, + progress=0.0, + stage="queued", + created_at=job.created_at, + links={ + "self": f"/api/v1/jobs/{job.id}", + "events": f"/api/v1/jobs/{job.id}/events", + "logs": f"/api/v1/jobs/{job.id}/logs", + "cancel": f"/api/v1/jobs/{job.id}", + "batch": f"/api/v1/batch/{batch_id}" + }, + ) + + created_jobs.append(job_response) + + # Estimate processing time (simplified) + estimated_time = _estimate_job_time(job_request) + total_estimated_time += estimated_time + + logger.info( + "Batch job created", + job_id=str(job.id), + batch_id=batch_id, + batch_index=i, + input_path=job_request.input[:50] + "..." if len(job_request.input) > 50 else job_request.input + ) + + except Exception as e: + logger.error( + "Failed to create batch job", + batch_id=batch_id, + batch_index=i, + error=str(e) + ) + warnings.append(f"Job {i+1} failed to create: {str(e)}") + + if not created_jobs: + raise HTTPException(status_code=500, detail="Failed to create any jobs in batch") + + # Estimate cost + estimated_cost = { + "processing_time_seconds": total_estimated_time, + "credits": 0, # For self-hosted, no credits + "jobs_created": len(created_jobs), + "jobs_failed": len(request.jobs) - len(created_jobs) + } + + logger.info( + "Batch job creation completed", + batch_id=batch_id, + jobs_created=len(created_jobs), + total_estimated_time=total_estimated_time + ) + + return BatchProcessResponse( + batch_id=batch_id, + total_jobs=len(created_jobs), + jobs=created_jobs, + estimated_cost=estimated_cost, + warnings=warnings + ) + + except HTTPException: + raise + except Exception as e: + logger.error("Batch job creation failed", error=str(e)) + raise HTTPException(status_code=500, detail="Failed to create batch job") + + +@router.get("/batch/{batch_id}") +async def get_batch_status( + batch_id: str, + db: AsyncSession = Depends(get_db), + api_key: str = Depends(require_api_key), +) -> Dict[str, Any]: + """Get status of a batch job.""" + try: + # Query all jobs in the batch + from sqlalchemy import select + result = await db.execute( + select(Job).where(Job.batch_id == batch_id, Job.api_key == api_key) + ) + batch_jobs = result.scalars().all() + + if not batch_jobs: + raise HTTPException(status_code=404, detail="Batch not found") + + # Calculate batch statistics + total_jobs = len(batch_jobs) + completed_jobs = sum(1 for job in batch_jobs if job.status == JobStatus.COMPLETED) + failed_jobs = sum(1 for job in batch_jobs if job.status == JobStatus.FAILED) + processing_jobs = sum(1 for job in batch_jobs if job.status == JobStatus.PROCESSING) + queued_jobs = sum(1 for job in batch_jobs if job.status == JobStatus.QUEUED) + + # Calculate overall progress + total_progress = sum(job.progress or 0 for job in batch_jobs) + overall_progress = total_progress / total_jobs if total_jobs > 0 else 0 + + # Determine batch status + if completed_jobs == total_jobs: + batch_status = "completed" + elif failed_jobs == total_jobs: + batch_status = "failed" + elif failed_jobs > 0 and completed_jobs + failed_jobs == total_jobs: + batch_status = "partial_success" + elif processing_jobs > 0 or queued_jobs > 0: + batch_status = "processing" + else: + batch_status = "unknown" + + return { + "batch_id": batch_id, + "status": batch_status, + "progress": overall_progress, + "statistics": { + "total_jobs": total_jobs, + "completed": completed_jobs, + "failed": failed_jobs, + "processing": processing_jobs, + "queued": queued_jobs + }, + "jobs": [ + { + "id": str(job.id), + "status": job.status, + "progress": job.progress or 0, + "created_at": job.created_at, + "started_at": job.started_at, + "completed_at": job.completed_at, + "input_path": job.input_path, + "output_path": job.output_path + } + for job in sorted(batch_jobs, key=lambda x: x.batch_index or 0) + ] + } + + except HTTPException: + raise + except Exception as e: + logger.error("Failed to get batch status", batch_id=batch_id, error=str(e)) + raise HTTPException(status_code=500, detail="Failed to get batch status") + + +@router.delete("/batch/{batch_id}") +async def cancel_batch( + batch_id: str, + db: AsyncSession = Depends(get_db), + api_key: str = Depends(require_api_key), +) -> Dict[str, Any]: + """Cancel all jobs in a batch.""" + try: + # Query all jobs in the batch + from sqlalchemy import select, update + result = await db.execute( + select(Job).where(Job.batch_id == batch_id, Job.api_key == api_key) + ) + batch_jobs = result.scalars().all() + + if not batch_jobs: + raise HTTPException(status_code=404, detail="Batch not found") + + cancelled_count = 0 + failed_to_cancel = 0 + + for job in batch_jobs: + if job.status in [JobStatus.QUEUED, JobStatus.PROCESSING]: + try: + # Cancel job in queue + if job.status == JobStatus.QUEUED: + success = await queue_service.cancel_job(str(job.id)) + else: # PROCESSING + success = await queue_service.cancel_running_job( + str(job.id), + job.worker_id or "" + ) + + if success: + # Update job status + await db.execute( + update(Job) + .where(Job.id == job.id) + .values(status=JobStatus.CANCELLED) + ) + cancelled_count += 1 + else: + failed_to_cancel += 1 + + except Exception as e: + logger.error( + "Failed to cancel job in batch", + job_id=str(job.id), + batch_id=batch_id, + error=str(e) + ) + failed_to_cancel += 1 + + await db.commit() + + return { + "batch_id": batch_id, + "total_jobs": len(batch_jobs), + "cancelled": cancelled_count, + "failed_to_cancel": failed_to_cancel, + "message": f"Cancelled {cancelled_count} jobs in batch" + } + + except HTTPException: + raise + except Exception as e: + logger.error("Failed to cancel batch", batch_id=batch_id, error=str(e)) + raise HTTPException(status_code=500, detail="Failed to cancel batch") + + +def _get_api_key_tier(api_key: str) -> str: + """Determine API key tier from key prefix.""" + if api_key.startswith('ent_'): + return 'enterprise' + elif api_key.startswith('prem_'): + return 'premium' + elif api_key.startswith('basic_'): + return 'basic' + else: + return 'free' + + +def _estimate_job_time(job_request: BatchJob) -> int: + """Estimate processing time for a single job in seconds.""" + base_time = 60 # Base processing time + + # Add time based on operations + for operation in job_request.operations: + op_type = operation.get('type', '') + if op_type == 'streaming': + base_time += 300 # Streaming takes longer + elif op_type == 'transcode': + base_time += 120 # Transcoding time + elif op_type in ['watermark', 'filter']: + base_time += 60 # Filter operations + else: + base_time += 30 # Other operations + + return base_time \ No newline at end of file diff --git a/api/utils/media_validator.py b/api/utils/media_validator.py new file mode 100644 index 0000000..837ab96 --- /dev/null +++ b/api/utils/media_validator.py @@ -0,0 +1,366 @@ +""" +Media file validation utility for security and integrity checks. +Prevents malicious uploads and ensures file safety. +""" +import hashlib +import magic +import os +from pathlib import Path +from typing import Dict, List, Optional, Tuple +import structlog + +from worker.utils.ffmpeg import FFmpegWrapper, FFmpegError + +logger = structlog.get_logger() + + +class MediaValidationError(Exception): + """Exception raised for media validation failures.""" + pass + + +class MaliciousFileError(MediaValidationError): + """Exception raised for potentially malicious files.""" + pass + + +class MediaValidator: + """Advanced media file validator for security and integrity.""" + + def __init__(self): + self.ffmpeg = FFmpegWrapper() + + # Allowed MIME types for media files + self.allowed_mime_types = { + # Video formats + 'video/mp4', + 'video/quicktime', + 'video/x-msvideo', # AVI + 'video/x-matroska', # MKV + 'video/webm', + 'video/x-flv', + 'video/3gpp', + 'video/mp2t', # MPEG-TS + 'video/mpeg', + 'video/ogg', + + # Audio formats + 'audio/mpeg', # MP3 + 'audio/mp4', # M4A/AAC + 'audio/wav', + 'audio/x-wav', + 'audio/ogg', + 'audio/flac', + 'audio/x-flac', + 'audio/aac', + 'audio/webm', + } + + # Dangerous file extensions that should never be processed + self.dangerous_extensions = { + '.exe', '.bat', '.cmd', '.com', '.scr', '.pif', '.vbs', '.js', + '.jar', '.app', '.deb', '.rpm', '.dmg', '.pkg', '.msi', + '.sh', '.bash', '.zsh', '.fish', '.ps1', '.php', '.asp', '.jsp' + } + + # Maximum file sizes (in bytes) + self.max_file_sizes = { + 'free': 100 * 1024 * 1024, # 100MB + 'basic': 500 * 1024 * 1024, # 500MB + 'premium': 2 * 1024 * 1024 * 1024, # 2GB + 'enterprise': 10 * 1024 * 1024 * 1024 # 10GB + } + + # Known malicious file signatures (hex patterns) + self.malicious_signatures = [ + b'MZ', # PE executable header + b'\x7fELF', # ELF executable header + b'\xca\xfe\xba\xbe', # Java class file + b'PK\x03\x04', # ZIP header (could contain malicious content) + ] + + async def validate_media_file(self, file_path: str, api_key_tier: str = 'free', + check_content: bool = True) -> Dict[str, any]: + """ + Comprehensive media file validation. + + Args: + file_path: Path to the media file + api_key_tier: API key tier for size limits + check_content: Whether to perform deep content analysis + + Returns: + Dict with validation results + + Raises: + MediaValidationError: If validation fails + MaliciousFileError: If file appears malicious + """ + try: + await self.ffmpeg.initialize() + + validation_results = { + 'file_path': file_path, + 'is_valid': False, + 'file_size': 0, + 'mime_type': None, + 'format_info': None, + 'security_checks': {}, + 'warnings': [], + 'errors': [] + } + + # Basic file existence and accessibility + if not os.path.exists(file_path): + raise MediaValidationError(f"File not found: {file_path}") + + if not os.access(file_path, os.R_OK): + raise MediaValidationError(f"File not readable: {file_path}") + + # File size validation + file_size = os.path.getsize(file_path) + validation_results['file_size'] = file_size + + max_size = self.max_file_sizes.get(api_key_tier, self.max_file_sizes['free']) + if file_size > max_size: + raise MediaValidationError( + f"File size {file_size} exceeds limit {max_size} for tier {api_key_tier}" + ) + + if file_size == 0: + raise MediaValidationError("File is empty") + + # Extension validation + file_ext = Path(file_path).suffix.lower() + if file_ext in self.dangerous_extensions: + raise MaliciousFileError(f"Dangerous file extension: {file_ext}") + + # MIME type validation + mime_type = self._get_mime_type(file_path) + validation_results['mime_type'] = mime_type + + if mime_type not in self.allowed_mime_types: + raise MediaValidationError(f"Unsupported MIME type: {mime_type}") + + # Security checks + security_results = await self._perform_security_checks(file_path) + validation_results['security_checks'] = security_results + + if security_results.get('malicious_signatures'): + raise MaliciousFileError("File contains malicious signatures") + + # Content validation with FFmpeg/FFprobe + if check_content: + try: + format_info = await self._validate_media_content(file_path) + validation_results['format_info'] = format_info + + # Additional content-based security checks + content_security = await self._check_content_security(file_path, format_info) + if content_security.get('suspicious'): + validation_results['warnings'].extend(content_security.get('warnings', [])) + + except FFmpegError as e: + raise MediaValidationError(f"Media content validation failed: {e}") + + validation_results['is_valid'] = True + + logger.info( + "Media file validation successful", + file_path=file_path, + file_size=file_size, + mime_type=mime_type, + tier=api_key_tier + ) + + return validation_results + + except Exception as e: + logger.error("Media validation failed", file_path=file_path, error=str(e)) + raise + + def _get_mime_type(self, file_path: str) -> str: + """Get MIME type using python-magic.""" + try: + mime = magic.Magic(mime=True) + return mime.from_file(file_path) + except Exception: + # Fallback to basic extension-based detection + ext = Path(file_path).suffix.lower() + mime_map = { + '.mp4': 'video/mp4', + '.avi': 'video/x-msvideo', + '.mov': 'video/quicktime', + '.mkv': 'video/x-matroska', + '.webm': 'video/webm', + '.mp3': 'audio/mpeg', + '.wav': 'audio/wav', + '.flac': 'audio/flac' + } + return mime_map.get(ext, 'application/octet-stream') + + async def _perform_security_checks(self, file_path: str) -> Dict[str, any]: + """Perform security-focused file analysis.""" + security_results = { + 'malicious_signatures': False, + 'file_hash': None, + 'entropy_analysis': None, + 'embedded_executables': False + } + + try: + # Calculate file hash for integrity + with open(file_path, 'rb') as f: + file_content = f.read(8192) # Read first 8KB for analysis + security_results['file_hash'] = hashlib.sha256(f.read()).hexdigest() + + # Check for malicious signatures in file header + for signature in self.malicious_signatures: + if signature in file_content: + security_results['malicious_signatures'] = True + break + + # Basic entropy analysis (high entropy might indicate packed/encrypted content) + if len(file_content) > 0: + entropy = self._calculate_entropy(file_content) + security_results['entropy_analysis'] = { + 'entropy': entropy, + 'suspicious': entropy > 7.5 # High entropy threshold + } + + return security_results + + except Exception as e: + logger.warning("Security check failed", file_path=file_path, error=str(e)) + return security_results + + def _calculate_entropy(self, data: bytes) -> float: + """Calculate Shannon entropy of data.""" + if not data: + return 0 + + # Count byte frequencies + freq = [0] * 256 + for byte in data: + freq[byte] += 1 + + # Calculate entropy + entropy = 0 + data_len = len(data) + for count in freq: + if count > 0: + p = count / data_len + entropy -= p * (p.bit_length() - 1) + + return entropy + + async def _validate_media_content(self, file_path: str) -> Dict[str, any]: + """Validate media content using FFprobe.""" + try: + probe_info = await self.ffmpeg.probe_file(file_path) + + # Basic format validation + format_info = probe_info.get('format', {}) + streams = probe_info.get('streams', []) + + if not streams: + raise MediaValidationError("No streams found in media file") + + # Validate stream types + valid_stream_types = {'video', 'audio', 'subtitle', 'attachment'} + for stream in streams: + codec_type = stream.get('codec_type', '') + if codec_type not in valid_stream_types: + raise MediaValidationError(f"Invalid stream type: {codec_type}") + + return { + 'format_name': format_info.get('format_name', ''), + 'duration': float(format_info.get('duration', 0)), + 'bit_rate': int(format_info.get('bit_rate', 0)), + 'size': int(format_info.get('size', 0)), + 'nb_streams': format_info.get('nb_streams', 0), + 'streams': [ + { + 'index': s.get('index', 0), + 'codec_type': s.get('codec_type', ''), + 'codec_name': s.get('codec_name', ''), + 'duration': float(s.get('duration', 0)) + } + for s in streams + ] + } + + except Exception as e: + raise MediaValidationError(f"Content validation failed: {e}") + + async def _check_content_security(self, file_path: str, format_info: Dict) -> Dict[str, any]: + """Perform additional security checks on media content.""" + security_info = { + 'suspicious': False, + 'warnings': [] + } + + try: + # Check for suspicious duration (extremely long files) + duration = format_info.get('duration', 0) + if duration > 86400: # > 24 hours + security_info['warnings'].append(f"Unusually long duration: {duration}s") + security_info['suspicious'] = True + + # Check for suspicious stream count + stream_count = format_info.get('nb_streams', 0) + if stream_count > 50: # Unusual number of streams + security_info['warnings'].append(f"Unusual stream count: {stream_count}") + security_info['suspicious'] = True + + # Check for suspicious codec combinations + streams = format_info.get('streams', []) + codec_names = [s.get('codec_name', '') for s in streams] + + # Flag unusual or potentially dangerous codecs + suspicious_codecs = {'bintext', 'idf', 'executable'} + for codec in codec_names: + if codec in suspicious_codecs: + security_info['warnings'].append(f"Suspicious codec: {codec}") + security_info['suspicious'] = True + + return security_info + + except Exception as e: + logger.warning("Content security check failed", error=str(e)) + return security_info + + async def validate_batch_files(self, file_paths: List[str], + api_key_tier: str = 'free') -> Dict[str, any]: + """Validate multiple files in batch.""" + results = { + 'total_files': len(file_paths), + 'valid_files': 0, + 'invalid_files': 0, + 'total_size': 0, + 'results': [] + } + + for file_path in file_paths: + try: + validation = await self.validate_media_file(file_path, api_key_tier) + results['valid_files'] += 1 + results['total_size'] += validation['file_size'] + results['results'].append({ + 'file_path': file_path, + 'status': 'valid', + 'validation': validation + }) + except Exception as e: + results['invalid_files'] += 1 + results['results'].append({ + 'file_path': file_path, + 'status': 'invalid', + 'error': str(e) + }) + + return results + + +# Global validator instance +media_validator = MediaValidator() \ No newline at end of file diff --git a/docs/API.md b/docs/API.md index f4593cb..e336b4c 100644 --- a/docs/API.md +++ b/docs/API.md @@ -812,4 +812,149 @@ The API automatically redirects HTTP traffic to HTTPS when SSL is enabled. - API Documentation: http://localhost:8000/docs - OpenAPI Schema: http://localhost:8000/openapi.json - Health Check: http://localhost:8000/api/v1/health -- Metrics: http://localhost:9090 (if monitoring enabled) \ No newline at end of file +- Metrics: http://localhost:9090 (if monitoring enabled) + +## Advanced Features + +### Batch Processing + +Process multiple files simultaneously with batch operations: + +```bash +curl -X POST "http://localhost:8000/api/v1/batch" \ + -H "Content-Type: application/json" \ + -H "X-API-Key: your-api-key" \ + -d '{ + "jobs": [ + { + "input": "/storage/video1.mp4", + "output": "/storage/output1.mp4", + "operations": [{"type": "transcode", "params": {"video_codec": "h264"}}] + }, + { + "input": "/storage/video2.avi", + "output": "/storage/output2.webm", + "operations": [{"type": "transcode", "params": {"video_codec": "vp9"}}] + } + ], + "batch_name": "Daily Processing", + "validate_files": true + }' +``` + +### Enhanced Thumbnails + +Create professional thumbnails with multiple options: + +```bash +# Single high-quality thumbnail +curl -X POST "http://localhost:8000/api/v1/convert" \ + -H "Content-Type: application/json" \ + -H "X-API-Key: your-api-key" \ + -d '{ + "input": "/storage/video.mp4", + "output": "/storage/thumb.jpg", + "operations": [ + { + "type": "thumbnail", + "params": { + "timestamp": 30, + "width": 1920, + "height": 1080, + "quality": "high" + } + } + ] + }' + +# Multiple thumbnails at intervals +curl -X POST "http://localhost:8000/api/v1/convert" \ + -H "Content-Type: application/json" \ + -H "X-API-Key: your-api-key" \ + -d '{ + "input": "/storage/video.mp4", + "output": "/storage/thumbnails/", + "operations": [ + { + "type": "thumbnail_grid", + "params": { + "rows": 3, + "cols": 4, + "width": 1280, + "height": 720 + } + } + ] + }' +``` + +### Adaptive Streaming + +Generate HLS/DASH streams with multiple quality variants: + +```bash +curl -X POST "http://localhost:8000/api/v1/stream" \ + -H "Content-Type: application/json" \ + -H "X-API-Key: your-api-key" \ + -d '{ + "input": "/storage/video.mp4", + "output": "/storage/streams/", + "type": "hls", + "variants": [ + {"resolution": "1920x1080", "bitrate": "5000k", "name": "1080p"}, + {"resolution": "1280x720", "bitrate": "2500k", "name": "720p"}, + {"resolution": "854x480", "bitrate": "1000k", "name": "480p"} + ], + "segment_duration": 6 + }' +``` + +### Quality Analysis + +Analyze video quality with industry-standard metrics: + +```bash +curl -X POST "http://localhost:8000/api/v1/analyze" \ + -H "Content-Type: application/json" \ + -H "X-API-Key: your-api-key" \ + -d '{ + "input": "/storage/processed.mp4", + "reference": "/storage/original.mp4", + "metrics": ["vmaf", "psnr", "ssim"] + }' +``` + +### Advanced Watermarking + +Professional watermark placement with precise control: + +```bash +curl -X POST "http://localhost:8000/api/v1/convert" \ + -H "Content-Type: application/json" \ + -H "X-API-Key: your-api-key" \ + -d '{ + "input": "/storage/video.mp4", + "output": "/storage/watermarked.mp4", + "operations": [ + { + "type": "watermark", + "params": { + "watermark_path": "/storage/logo.png", + "position": "bottom-right", + "opacity": 0.8, + "scale": 0.15 + } + } + ] + }' +``` + +### Media File Security + +All uploaded files are automatically validated for security: + +- **Malware Detection**: Scans for malicious file signatures +- **MIME Type Validation**: Ensures files are legitimate media +- **Content Analysis**: Deep inspection with FFprobe +- **Size Limits**: Configurable per API key tier +- **Entropy Analysis**: Detects packed/encrypted content \ No newline at end of file diff --git a/worker/processors/video.py b/worker/processors/video.py index 55c426b..ee7698c 100644 --- a/worker/processors/video.py +++ b/worker/processors/video.py @@ -2,6 +2,7 @@ Video processing module with production-grade FFmpeg integration. """ import asyncio +import json import os import tempfile from pathlib import Path @@ -374,14 +375,25 @@ async def get_video_info(self, file_path: str) -> Dict[str, Any]: raise VideoProcessingError(f"Failed to get video info: {e}") async def create_thumbnail(self, input_path: str, output_path: str, - timestamp: float = 10.0, width: int = 320, height: int = 240) -> Dict[str, Any]: - """Create a thumbnail image from video at specified timestamp.""" + timestamp: float = 10.0, width: int = 320, height: int = 240, + quality: str = "high") -> Dict[str, Any]: + """Create a high-quality thumbnail image from video at specified timestamp.""" await self.initialize() try: # Ensure output directory exists Path(output_path).parent.mkdir(parents=True, exist_ok=True) + # Quality settings + quality_settings = { + 'low': {'qscale': 10, 'format': 'mjpeg'}, + 'medium': {'qscale': 5, 'format': 'mjpeg'}, + 'high': {'qscale': 2, 'format': 'mjpeg'}, + 'png': {'format': 'png'} + } + + quality_config = quality_settings.get(quality, quality_settings['high']) + # Create thumbnail using FFmpeg options = { 'format': 'image2', @@ -398,7 +410,8 @@ async def create_thumbnail(self, input_path: str, output_path: str, 'params': { 'width': width, 'height': height, - 'video_codec': 'mjpeg' + 'video_codec': quality_config['format'], + **({} if 'qscale' not in quality_config else {'qscale': quality_config['qscale']}) } } ] @@ -415,9 +428,468 @@ async def create_thumbnail(self, input_path: str, output_path: str, 'success': True, 'thumbnail_path': output_path, 'timestamp': timestamp, - 'dimensions': f"{width}x{height}" + 'dimensions': f"{width}x{height}", + 'quality': quality } except Exception as e: logger.error("Thumbnail creation failed", error=str(e)) - raise VideoProcessingError(f"Thumbnail creation failed: {e}") \ No newline at end of file + raise VideoProcessingError(f"Thumbnail creation failed: {e}") + + async def create_thumbnail_grid(self, input_path: str, output_path: str, + rows: int = 3, cols: int = 4, width: int = 1280, height: int = 720) -> Dict[str, Any]: + """Create a thumbnail grid showing multiple frames from the video.""" + await self.initialize() + + try: + # Get video duration to calculate timestamps + duration = await self.ffmpeg.get_file_duration(input_path) + if duration <= 0: + raise VideoProcessingError("Could not determine video duration") + + # Calculate grid dimensions + tile_width = width // cols + tile_height = height // rows + total_tiles = rows * cols + + # Skip first and last 10% of video for better thumbnails + start_time = duration * 0.1 + end_time = duration * 0.9 + interval = (end_time - start_time) / (total_tiles - 1) + + # Ensure output directory exists + Path(output_path).parent.mkdir(parents=True, exist_ok=True) + + # Create grid using FFmpeg tile filter + options = { + 'format': 'image2', + 'vframes': 1 + } + + operations = [ + { + 'type': 'filter', + 'params': { + 'select': f'not(mod(n\\,{int(duration * 25 / total_tiles)}))', # Assuming 25fps + 'tile': f'{cols}x{rows}', + 'scale': f'{tile_width}:{tile_height}' + } + } + ] + + result = await self.ffmpeg.execute_command( + input_path=input_path, + output_path=output_path, + options=options, + operations=operations, + timeout=120 # 2 minute timeout for grid + ) + + return { + 'success': True, + 'thumbnail_grid_path': output_path, + 'dimensions': f"{width}x{height}", + 'grid_size': f"{cols}x{rows}", + 'total_frames': total_tiles + } + + except Exception as e: + logger.error("Thumbnail grid creation failed", error=str(e)) + raise VideoProcessingError(f"Thumbnail grid creation failed: {e}") + + async def create_multiple_thumbnails(self, input_path: str, output_dir: str, + count: int = 5, width: int = 320, height: int = 240, + quality: str = "high") -> Dict[str, Any]: + """Create multiple thumbnails at evenly spaced intervals.""" + await self.initialize() + + try: + # Get video duration + duration = await self.ffmpeg.get_file_duration(input_path) + if duration <= 0: + raise VideoProcessingError("Could not determine video duration") + + # Calculate timestamps (skip first and last 10%) + start_time = duration * 0.1 + end_time = duration * 0.9 + interval = (end_time - start_time) / (count - 1) if count > 1 else 0 + + # Ensure output directory exists + Path(output_dir).mkdir(parents=True, exist_ok=True) + + thumbnails = [] + for i in range(count): + timestamp = start_time + (i * interval) + output_path = f"{output_dir}/thumb_{i+1:03d}_{int(timestamp)}s.jpg" + + result = await self.create_thumbnail( + input_path=input_path, + output_path=output_path, + timestamp=timestamp, + width=width, + height=height, + quality=quality + ) + + thumbnails.append({ + 'path': output_path, + 'timestamp': timestamp, + 'index': i + 1 + }) + + return { + 'success': True, + 'thumbnails': thumbnails, + 'count': count, + 'total_duration': duration + } + + except Exception as e: + logger.error("Multiple thumbnails creation failed", error=str(e)) + raise VideoProcessingError(f"Multiple thumbnails creation failed: {e}") + + async def create_adaptive_stream(self, input_path: str, output_path: str, + format_type: str = "hls", variants: List[Dict] = None, + segment_duration: int = 6) -> Dict[str, Any]: + """Create adaptive streaming formats (HLS/DASH) with multiple quality variants.""" + await self.initialize() + + try: + # Default variants if none provided + if variants is None: + variants = [ + {"resolution": "1920x1080", "bitrate": "5000k", "name": "1080p"}, + {"resolution": "1280x720", "bitrate": "2500k", "name": "720p"}, + {"resolution": "854x480", "bitrate": "1000k", "name": "480p"}, + {"resolution": "640x360", "bitrate": "500k", "name": "360p"} + ] + + # Ensure output directory exists + Path(output_path).parent.mkdir(parents=True, exist_ok=True) + + # Set output path based on format + if format_type == "hls": + output_file = f"{output_path}/playlist.m3u8" + else: # dash + output_file = f"{output_path}/manifest.mpd" + + # Configure streaming options + options = { + 'format': format_type + } + + operations = [ + { + 'type': 'streaming', + 'params': { + 'format': format_type, + 'segment_time': segment_duration, + 'variants': variants + } + } + ] + + # Add multi-variant encoding for adaptive streaming + for i, variant in enumerate(variants): + operations.append({ + 'type': 'transcode', + 'params': { + 'video_codec': 'h264', + 'audio_codec': 'aac', + 'video_bitrate': variant['bitrate'], + 'width': int(variant['resolution'].split('x')[0]), + 'height': int(variant['resolution'].split('x')[1]), + 'preset': 'medium' + } + }) + + # Calculate timeout based on video duration and number of variants + duration = await self.ffmpeg.get_file_duration(input_path) + timeout = max(600, int(duration * len(variants) * 2)) # 2x realtime per variant + + logger.info( + "Starting adaptive streaming creation", + format=format_type, + variants=len(variants), + duration=duration, + timeout=timeout + ) + + result = await self.ffmpeg.execute_command( + input_path=input_path, + output_path=output_file, + options=options, + operations=operations, + timeout=timeout + ) + + return { + 'success': True, + 'output_path': output_path, + 'manifest_file': output_file, + 'format': format_type, + 'variants': variants, + 'segment_duration': segment_duration, + 'total_duration': duration + } + + except Exception as e: + logger.error("Adaptive streaming creation failed", error=str(e)) + raise VideoProcessingError(f"Adaptive streaming creation failed: {e}") + + async def analyze_quality(self, input_path: str, reference_path: str = None, + metrics: List[str] = None) -> Dict[str, Any]: + """Analyze video quality using VMAF, PSNR, and SSIM metrics.""" + await self.initialize() + + try: + if metrics is None: + metrics = ['vmaf', 'psnr', 'ssim'] + + analysis_results = {} + + # Create temporary files for analysis + with tempfile.TemporaryDirectory() as temp_dir: + for metric in metrics: + if metric.lower() == 'vmaf' and reference_path: + # VMAF requires a reference video + result = await self._analyze_vmaf(input_path, reference_path, temp_dir) + analysis_results['vmaf'] = result + elif metric.lower() == 'psnr' and reference_path: + # PSNR comparison with reference + result = await self._analyze_psnr(input_path, reference_path, temp_dir) + analysis_results['psnr'] = result + elif metric.lower() == 'ssim' and reference_path: + # SSIM comparison with reference + result = await self._analyze_ssim(input_path, reference_path, temp_dir) + analysis_results['ssim'] = result + else: + # Basic video quality metrics without reference + result = await self._analyze_basic_quality(input_path, temp_dir) + analysis_results['basic'] = result + + return { + 'success': True, + 'input_path': input_path, + 'reference_path': reference_path, + 'metrics': analysis_results, + 'analyzed_metrics': metrics + } + + except Exception as e: + logger.error("Quality analysis failed", error=str(e)) + raise VideoProcessingError(f"Quality analysis failed: {e}") + + async def _analyze_vmaf(self, input_path: str, reference_path: str, temp_dir: str) -> Dict[str, Any]: + """Analyze VMAF score between input and reference video.""" + vmaf_log = f"{temp_dir}/vmaf.json" + + operations = [ + { + 'type': 'filter', + 'params': { + 'vmaf': f'model_path=/usr/share/vmaf/vmaf_v0.6.1.pkl:log_path={vmaf_log}:log_fmt=json' + } + } + ] + + # Use reference as second input + cmd = ['ffmpeg', '-y', '-i', input_path, '-i', reference_path] + cmd.extend(['-lavfi', f'[0:v][1:v]libvmaf=model_path=/usr/share/vmaf/vmaf_v0.6.1.pkl:log_path={vmaf_log}:log_fmt=json']) + cmd.extend(['-f', 'null', '-']) + + try: + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + await process.wait() + + # Read VMAF results + if os.path.exists(vmaf_log): + with open(vmaf_log, 'r') as f: + vmaf_data = json.load(f) + + return { + 'mean_score': vmaf_data.get('pooled_metrics', {}).get('vmaf', {}).get('mean', 0), + 'min_score': vmaf_data.get('pooled_metrics', {}).get('vmaf', {}).get('min', 0), + 'max_score': vmaf_data.get('pooled_metrics', {}).get('vmaf', {}).get('max', 0), + 'harmonic_mean': vmaf_data.get('pooled_metrics', {}).get('vmaf', {}).get('harmonic_mean', 0) + } + else: + return {'error': 'VMAF analysis failed - no output generated'} + + except Exception as e: + return {'error': f'VMAF analysis failed: {str(e)}'} + + async def _analyze_psnr(self, input_path: str, reference_path: str, temp_dir: str) -> Dict[str, Any]: + """Analyze PSNR between input and reference video.""" + psnr_log = f"{temp_dir}/psnr.log" + + cmd = ['ffmpeg', '-y', '-i', input_path, '-i', reference_path] + cmd.extend(['-lavfi', f'[0:v][1:v]psnr=stats_file={psnr_log}']) + cmd.extend(['-f', 'null', '-']) + + try: + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + await process.wait() + + # Parse PSNR results from stderr + if os.path.exists(psnr_log): + with open(psnr_log, 'r') as f: + psnr_lines = f.readlines() + + # Extract average PSNR values + if psnr_lines: + last_line = psnr_lines[-1] + # Parse PSNR values from the summary line + psnr_values = {} + if 'average:' in last_line: + parts = last_line.split('average:')[1].split() + for part in parts: + if ':' in part and 'inf' not in part: + key, value = part.split(':') + try: + psnr_values[key] = float(value) + except ValueError: + continue + + return psnr_values + + return {'error': 'PSNR analysis failed - no output generated'} + + except Exception as e: + return {'error': f'PSNR analysis failed: {str(e)}'} + + async def _analyze_ssim(self, input_path: str, reference_path: str, temp_dir: str) -> Dict[str, Any]: + """Analyze SSIM between input and reference video.""" + cmd = ['ffmpeg', '-y', '-i', input_path, '-i', reference_path] + cmd.extend(['-lavfi', '[0:v][1:v]ssim']) + cmd.extend(['-f', 'null', '-']) + + try: + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + + # Parse SSIM from stderr + stderr_text = stderr.decode('utf-8') + ssim_values = {} + + for line in stderr_text.split('\n'): + if 'SSIM' in line and 'All:' in line: + # Extract SSIM values + parts = line.split() + for part in parts: + if part.startswith('All:') or part.startswith('Y:') or part.startswith('U:') or part.startswith('V:'): + key, value = part.split(':') + try: + ssim_values[key.lower()] = float(value.split('(')[0]) + except (ValueError, IndexError): + continue + + return ssim_values if ssim_values else {'error': 'SSIM analysis failed'} + + except Exception as e: + return {'error': f'SSIM analysis failed: {str(e)}'} + + async def _analyze_basic_quality(self, input_path: str, temp_dir: str) -> Dict[str, Any]: + """Analyze basic video quality metrics without reference.""" + try: + # Get basic video information + probe_info = await self.ffmpeg.probe_file(input_path) + video_stream = next((s for s in probe_info.get('streams', []) + if s.get('codec_type') == 'video'), {}) + + # Calculate basic quality indicators + width = video_stream.get('width', 0) + height = video_stream.get('height', 0) + bitrate = int(video_stream.get('bit_rate', 0)) + fps = self._parse_fps(video_stream.get('r_frame_rate', '0/1')) + + # Quality score based on resolution and bitrate + pixel_count = width * height + if pixel_count > 0: + bits_per_pixel = bitrate / (pixel_count * fps) if fps > 0 else 0 + else: + bits_per_pixel = 0 + + return { + 'resolution': f"{width}x{height}", + 'bitrate': bitrate, + 'fps': fps, + 'bits_per_pixel': bits_per_pixel, + 'quality_score': min(100, max(0, bits_per_pixel * 50)) # Normalized quality score + } + + except Exception as e: + return {'error': f'Basic quality analysis failed: {str(e)}'} + + async def create_watermarked_video(self, input_path: str, output_path: str, + watermark_path: str, position: str = "top-right", + opacity: float = 0.7, scale: float = 0.1) -> Dict[str, Any]: + """Create a watermarked video with enhanced positioning and scaling options.""" + await self.initialize() + + try: + # Ensure output directory exists + Path(output_path).parent.mkdir(parents=True, exist_ok=True) + + # Position mappings + position_map = { + 'top-left': '10:10', + 'top-right': 'W-w-10:10', + 'bottom-left': '10:H-h-10', + 'bottom-right': 'W-w-10:H-h-10', + 'center': '(W-w)/2:(H-h)/2' + } + + overlay_position = position_map.get(position, 'W-w-10:10') + + # Create watermark overlay filter + watermark_filter = f"movie={watermark_path}:loop=1,setpts=N/(FRAME_RATE*TB),scale=iw*{scale}:ih*{scale},format=rgba,colorchannelmixer=aa={opacity}[watermark];[in][watermark]overlay={overlay_position}[out]" + + operations = [ + { + 'type': 'filter', + 'params': { + 'complex_filter': watermark_filter + } + }, + { + 'type': 'transcode', + 'params': { + 'video_codec': 'h264', + 'audio_codec': 'copy', # Copy audio without re-encoding + 'preset': 'medium' + } + } + ] + + result = await self.ffmpeg.execute_command( + input_path=input_path, + output_path=output_path, + options={}, + operations=operations, + timeout=self._calculate_timeout(await self.ffmpeg.get_file_duration(input_path), operations) + ) + + return { + 'success': True, + 'output_path': output_path, + 'watermark_position': position, + 'watermark_opacity': opacity, + 'watermark_scale': scale + } + + except Exception as e: + logger.error("Watermark creation failed", error=str(e)) + raise VideoProcessingError(f"Watermark creation failed: {e}") \ No newline at end of file diff --git a/worker/utils/ffmpeg.py b/worker/utils/ffmpeg.py index 159b7fa..878d77f 100644 --- a/worker/utils/ffmpeg.py +++ b/worker/utils/ffmpeg.py @@ -193,6 +193,8 @@ def build_command(self, input_path: str, output_path: str, video_filters.extend(self._handle_filters(params)) elif op_type == 'stream_map': cmd.extend(self._handle_stream_map(params)) + elif op_type == 'streaming': + cmd.extend(self._handle_streaming(params)) # Add video filters if video_filters: @@ -269,7 +271,7 @@ def _validate_operations(self, operations: List[Dict[str, Any]]): if not isinstance(operations, list): raise FFmpegCommandError("Operations must be a list") - allowed_operation_types = {'transcode', 'trim', 'watermark', 'filter', 'stream_map'} + allowed_operation_types = {'transcode', 'trim', 'watermark', 'filter', 'stream_map', 'streaming'} for i, operation in enumerate(operations): if not isinstance(operation, dict): @@ -296,6 +298,8 @@ def _validate_operation_params(self, op_type: str, params: Dict[str, Any]): self._validate_filter_params(params) elif op_type == 'watermark': self._validate_watermark_params(params) + elif op_type == 'streaming': + self._validate_streaming_params(params) def _validate_transcode_params(self, params: Dict[str, Any]): """Validate transcoding parameters.""" @@ -356,6 +360,39 @@ def _validate_watermark_params(self, params: Dict[str, Any]): if not isinstance(opacity, (int, float)) or opacity < 0 or opacity > 1: raise FFmpegCommandError(f"Invalid opacity: {opacity}") + def _validate_streaming_params(self, params: Dict[str, Any]): + """Validate streaming parameters.""" + # Validate streaming format + if 'format' in params: + allowed_formats = {'hls', 'dash'} + if params['format'] not in allowed_formats: + raise FFmpegCommandError(f"Invalid streaming format: {params['format']}") + + # Validate segment duration + if 'segment_time' in params: + segment_time = params['segment_time'] + if not isinstance(segment_time, (int, float)) or segment_time < 1 or segment_time > 60: + raise FFmpegCommandError(f"Invalid segment time: {segment_time}") + + # Validate variants + if 'variants' in params: + if not isinstance(params['variants'], list): + raise FFmpegCommandError("Variants must be a list") + + for i, variant in enumerate(params['variants']): + if not isinstance(variant, dict): + raise FFmpegCommandError(f"Variant {i} must be a dictionary") + + # Validate resolution + if 'resolution' in variant: + resolution = variant['resolution'] + if not isinstance(resolution, str) or 'x' not in resolution: + raise FFmpegCommandError(f"Invalid resolution format in variant {i}: {resolution}") + + # Validate bitrate + if 'bitrate' in variant: + self._validate_bitrate(variant['bitrate'], f"variant_{i}_bitrate") + def _validate_string_parameter(self, value: str, param_name: str): """Validate string parameters for command injection.""" if not isinstance(value, str): @@ -532,6 +569,42 @@ def _handle_stream_map(self, params: Dict[str, Any]) -> List[str]: return cmd_parts + def _handle_streaming(self, params: Dict[str, Any]) -> List[str]: + """Handle adaptive streaming (HLS/DASH) output.""" + cmd_parts = [] + + streaming_format = params.get('format', 'hls') + segment_time = params.get('segment_time', 6) + + if streaming_format == 'hls': + # HLS streaming configuration + cmd_parts.extend(['-f', 'hls']) + cmd_parts.extend(['-hls_time', str(segment_time)]) + cmd_parts.extend(['-hls_playlist_type', 'vod']) + cmd_parts.extend(['-hls_segment_filename', 'segment_%03d.ts']) + + # Master playlist for multiple variants + if 'variants' in params: + cmd_parts.extend(['-master_pl_name', 'master.m3u8']) + + # Add variant streams + for i, variant in enumerate(params['variants']): + if 'resolution' in variant and 'bitrate' in variant: + resolution = variant['resolution'] + bitrate = variant['bitrate'] + + # Add stream map for this variant + cmd_parts.extend(['-var_stream_map', f'v:{i},a:{i}']) + + elif streaming_format == 'dash': + # DASH streaming configuration + cmd_parts.extend(['-f', 'dash']) + cmd_parts.extend(['-seg_duration', str(segment_time)]) + cmd_parts.extend(['-use_template', '1']) + cmd_parts.extend(['-use_timeline', '1']) + + return cmd_parts + def _handle_global_options(self, options: Dict[str, Any]) -> List[str]: """Handle global FFmpeg options.""" cmd_parts = [] @@ -732,7 +805,7 @@ async def get_file_duration(self, file_path: str) -> float: def validate_operations(self, operations: List[Dict[str, Any]]) -> bool: """Validate operations before processing.""" - valid_operations = {'transcode', 'trim', 'watermark', 'filter', 'stream_map'} + valid_operations = {'transcode', 'trim', 'watermark', 'filter', 'stream_map', 'streaming'} for operation in operations: if 'type' not in operation: