强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

FFmpeg 多媒体处理教程 / Python 集成

Python 集成

概述

Python 是 FFmpeg 自动化和集成的常用语言。本章介绍如何使用 Python 操作 FFmpeg,包括 FFmpeg-Python 库、子进程调用、复杂滤镜图构建和批量处理。

FFmpeg-Python 库

安装

pip install ffmpeg-python

基本用法

import ffmpeg

# 基本转码
ffmpeg.input('input.mp4').output('output.mp4').run()

# 指定编解码器
ffmpeg.input('input.mp4').output('output.mp4', 
    vcodec='libx264', 
    acodec='aac',
    crf=23
).run()

# 覆盖输出文件
ffmpeg.input('input.mp4').output('output.mp4').overwrite_output().run()

输入输出

import ffmpeg

# 单输入单输出
stream = ffmpeg.input('input.mp4')
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)

# 多输入
video = ffmpeg.input('video.mp4')
audio = ffmpeg.input('audio.mp3')
stream = ffmpeg.output(video, audio, 'output.mp4', vcodec='copy', acodec='aac')
ffmpeg.run(stream)

# 从 URL 输入
stream = ffmpeg.input('rtmp://server/live/stream')
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)

# 输出到管道
stream = ffmpeg.input('input.mp4')
process = ffmpeg.run_async(stream, pipe_stdout=True)

编解码器设置

import ffmpeg

# 视频编解码器
ffmpeg.input('input.mp4').output('output.mp4', 
    vcodec='libx264',
    acodec='aac',
    video_bitrate='2M',
    audio_bitrate='128k'
).run()

# CRF 模式
ffmpeg.input('input.mp4').output('output.mp4',
    vcodec='libx264',
    crf=23,
    preset='medium'
).run()

# 复制流
ffmpeg.input('input.mp4').output('output.mkv',
    vcodec='copy',
    acodec='copy'
).run()

视频滤镜

import ffmpeg

# 缩放
stream = ffmpeg.input('input.mp4')
stream = ffmpeg.filter(stream, 'scale', 1280, 720)
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)

# 裁剪
stream = ffmpeg.input('input.mp4')
stream = ffmpeg.filter(stream, 'crop', 640, 480, 0, 0)
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)

# 旋转
stream = ffmpeg.input('input.mp4')
stream = ffmpeg.filter(stream, 'transpose', 1)
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)

音频滤镜

import ffmpeg

# 音量调整
stream = ffmpeg.input('input.mp4')
stream = ffmpeg.filter(stream, 'volume', 2.0)
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)

# 淡入淡出
stream = ffmpeg.input('input.mp4')
stream = ffmpeg.filter(stream, 'afade', t='in', st=0, d=3)
stream = ffmpeg.filter(stream, 'afade', t='out', st=57, d=3)
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)

复杂滤镜图

import ffmpeg

# 画中画
main = ffmpeg.input('main.mp4')
pip = ffmpeg.input('pip.mp4')
pip = ffmpeg.filter(pip, 'scale', 320, 240)
stream = ffmpeg.overlay(main, pip, x='W-w-10', y='H-h-10')
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)

# 水印叠加
video = ffmpeg.input('input.mp4')
watermark = ffmpeg.input('watermark.png')
stream = ffmpeg.overlay(video, watermark, x=10, y=10)
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)

获取媒体信息

import ffmpeg

# 获取文件信息
info = ffmpeg.probe('input.mp4')

# 打印信息
print(f"格式: {info['format']['format_name']}")
print(f"时长: {info['format']['duration']} 秒")
print(f"大小: {info['format']['size']} 字节")

# 获取流信息
for stream in info['streams']:
    print(f"流类型: {stream['codec_type']}")
    print(f"编解码器: {stream['codec_name']}")

错误处理

import ffmpeg

try:
    ffmpeg.input('input.mp4').output('output.mp4').run()
except ffmpeg.Error as e:
    print(f"FFmpeg 错误: {e.stderr.decode()}")

子进程调用

基本调用

import subprocess

# 基本命令
cmd = ['ffmpeg', '-i', 'input.mp4', 'output.mp4']
subprocess.run(cmd)

# 带参数
cmd = [
    'ffmpeg', '-i', 'input.mp4',
    '-c:v', 'libx264', '-crf', '23',
    '-c:a', 'aac', '-b:a', '128k',
    'output.mp4'
]
subprocess.run(cmd)

捕获输出

import subprocess

# 捕获标准输出和错误
cmd = ['ffmpeg', '-i', 'input.mp4', 'output.mp4']
result = subprocess.run(cmd, capture_output=True, text=True)

print("标准输出:", result.stdout)
print("标准错误:", result.stderr)
print("返回码:", result.returncode)

实时输出

import subprocess

# 实时显示输出
cmd = ['ffmpeg', '-i', 'input.mp4', '-c:v', 'libx264', 'output.mp4']
process = subprocess.Popen(cmd, stderr=subprocess.PIPE, universal_newlines=True)

for line in process.stderr:
    if 'time=' in line:
        print(line.strip())

process.wait()

进度监控

import subprocess
import re
import json

def get_duration(input_file):
    """获取视频时长"""
    cmd = [
        'ffprobe', '-v', 'quiet',
        '-print_format', 'json',
        '-show_format', input_file
    ]
    result = subprocess.run(cmd, capture_output=True, text=True)
    info = json.loads(result.stdout)
    return float(info['format']['duration'])

def transcode_with_progress(input_file, output_file):
    """带进度的转码"""
    duration = get_duration(input_file)
    
    cmd = [
        'ffmpeg', '-i', input_file,
        '-c:v', 'libx264', '-crf', '23',
        '-c:a', 'aac',
        output_file
    ]
    
    process = subprocess.Popen(cmd, stderr=subprocess.PIPE, universal_newlines=True)
    
    for line in process.stderr:
        time_match = re.search(r'time=(\d+:\d+:\d+\.\d+)', line)
        if time_match:
            time_str = time_match.group(1)
            h, m, s = time_str.split(':')
            current = float(h) * 3600 + float(m) * 60 + float(s)
            progress = (current / duration) * 100
            print(f"\r进度: {progress:.1f}%", end='')
    
    process.wait()
    print("\n转码完成")

高级用法

并行处理

import ffmpeg
import os
from concurrent.futures import ThreadPoolExecutor

def process_file(input_file, output_file):
    """处理单个文件"""
    try:
        ffmpeg.input(input_file).output(
            output_file,
            vcodec='libx264',
            crf=23,
            acodec='aac'
        ).overwrite_output().run(quiet=True)
        return True
    except ffmpeg.Error:
        return False

def batch_process(input_dir, output_dir, max_workers=4):
    """批量处理"""
    os.makedirs(output_dir, exist_ok=True)
    
    files = [
        f for f in os.listdir(input_dir)
        if f.endswith(('.mp4', '.avi', '.mkv'))
    ]
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for f in files:
            input_path = os.path.join(input_dir, f)
            output_path = os.path.join(output_dir, f)
            futures.append(executor.submit(process_file, input_path, output_path))
        
        for future in futures:
            future.result()

流媒体处理

import ffmpeg

# RTMP 推流
ffmpeg.input('input.mp4').output(
    'rtmp://server/live/stream',
    vcodec='libx264',
    acodec='aac',
    preset='fast',
    tune='zerolatency',
    video_bitrate='3M',
    maxrate='3M',
    bufsize='6M',
    f='flv'
).run()

# HLS 输出
ffmpeg.input('input.mp4').output(
    'playlist.m3u8',
    vcodec='libx264',
    acodec='aac',
    hls_time=4,
    hls_list_size=10,
    hls_segment_filename='segment_%03d.ts'
).run()

硬件加速

import ffmpeg

# NVIDIA GPU 加速
ffmpeg.input('input.mp4').output(
    'output.mp4',
    vcodec='h264_nvenc',
    preset='fast',
    video_bitrate='3M'
).run()

# Intel QSV 加速
ffmpeg.input('input.mp4').output(
    'output.mp4',
    vcodec='h264_qsv',
    preset='fast',
    video_bitrate='3M'
).run()

多输出

import ffmpeg

# 多码率输出
stream = ffmpeg.input('input.mp4')

# 1080p
stream_1080p = ffmpeg.filter(stream, 'scale', 1920, 1080)
output_1080p = ffmpeg.output(stream_1080p, 'output_1080p.mp4',
    vcodec='libx264', crf=23, preset='medium')

# 720p
stream_720p = ffmpeg.filter(stream, 'scale', 1280, 720)
output_720p = ffmpeg.output(stream_720p, 'output_720p.mp4',
    vcodec='libx264', crf=23, preset='medium')

# 480p
stream_480p = ffmpeg.filter(stream, 'scale', 854, 480)
output_480p = ffmpeg.output(stream_480p, 'output_480p.mp4',
    vcodec='libx264', crf=23, preset='medium')

# 运行所有输出
ffmpeg.run(output_1080p, output_720p, output_480p)

封装类

FFmpeg 处理器类

import ffmpeg
import subprocess
import json
import os
from typing import Optional, Dict, List

class FFmpegProcessor:
    """FFmpeg 处理器"""
    
    def __init__(self, ffmpeg_path: str = 'ffmpeg', ffprobe_path: str = 'ffprobe'):
        self.ffmpeg_path = ffmpeg_path
        self.ffprobe_path = ffprobe_path
    
    def get_info(self, input_file: str) -> Dict:
        """获取媒体信息"""
        cmd = [
            self.ffprobe_path,
            '-v', 'quiet',
            '-print_format', 'json',
            '-show_format',
            '-show_streams',
            input_file
        ]
        result = subprocess.run(cmd, capture_output=True, text=True)
        return json.loads(result.stdout)
    
    def get_duration(self, input_file: str) -> float:
        """获取时长"""
        info = self.get_info(input_file)
        return float(info['format']['duration'])
    
    def transcode(
        self,
        input_file: str,
        output_file: str,
        vcodec: str = 'libx264',
        acodec: str = 'aac',
        crf: int = 23,
        preset: str = 'medium',
        video_bitrate: Optional[str] = None,
        audio_bitrate: Optional[str] = '128k',
        overwrite: bool = True
    ) -> bool:
        """转码"""
        try:
            stream = ffmpeg.input(input_file)
            
            kwargs = {
                'vcodec': vcodec,
                'acodec': acodec,
                'crf': crf,
                'preset': preset,
            }
            
            if video_bitrate:
                kwargs['video_bitrate'] = video_bitrate
            if audio_bitrate:
                kwargs['audio_bitrate'] = audio_bitrate
            
            stream = ffmpeg.output(stream, output_file, **kwargs)
            
            if overwrite:
                stream = ffmpeg.overwrite_output(stream)
            
            ffmpeg.run(stream, quiet=True)
            return True
        except ffmpeg.Error:
            return False
    
    def scale(
        self,
        input_file: str,
        output_file: str,
        width: int,
        height: int,
        **kwargs
    ) -> bool:
        """缩放"""
        try:
            stream = ffmpeg.input(input_file)
            stream = ffmpeg.filter(stream, 'scale', width, height)
            stream = ffmpeg.output(stream, output_file, **kwargs)
            stream = ffmpeg.overwrite_output(stream)
            ffmpeg.run(stream, quiet=True)
            return True
        except ffmpeg.Error:
            return False
    
    def extract_audio(
        self,
        input_file: str,
        output_file: str,
        acodec: str = 'libmp3lame',
        audio_bitrate: str = '192k'
    ) -> bool:
        """提取音频"""
        try:
            stream = ffmpeg.input(input_file)
            stream = ffmpeg.output(stream, output_file,
                vn=None,
                acodec=acodec,
                audio_bitrate=audio_bitrate)
            stream = ffmpeg.overwrite_output(stream)
            ffmpeg.run(stream, quiet=True)
            return True
        except ffmpeg.Error:
            return False
    
    def add_watermark(
        self,
        input_file: str,
        watermark_file: str,
        output_file: str,
        x: int = 10,
        y: int = 10,
        **kwargs
    ) -> bool:
        """添加水印"""
        try:
            video = ffmpeg.input(input_file)
            watermark = ffmpeg.input(watermark_file)
            stream = ffmpeg.overlay(video, watermark, x=x, y=y)
            stream = ffmpeg.output(stream, output_file, **kwargs)
            stream = ffmpeg.overwrite_output(stream)
            ffmpeg.run(stream, quiet=True)
            return True
        except ffmpeg.Error:
            return False
    
    def create_thumbnail(
        self,
        input_file: str,
        output_file: str,
        timestamp: str = '00:00:10'
    ) -> bool:
        """创建缩略图"""
        try:
            stream = ffmpeg.input(input_file, ss=timestamp)
            stream = ffmpeg.output(stream, output_file, vframes=1)
            stream = ffmpeg.overwrite_output(stream)
            ffmpeg.run(stream, quiet=True)
            return True
        except ffmpeg.Error:
            return False
    
    def concat(
        self,
        input_files: List[str],
        output_file: str,
        **kwargs
    ) -> bool:
        """拼接视频"""
        try:
            inputs = [ffmpeg.input(f) for f in input_files]
            stream = ffmpeg.concat(*inputs)
            stream = ffmpeg.output(stream, output_file, **kwargs)
            stream = ffmpeg.overwrite_output(stream)
            ffmpeg.run(stream, quiet=True)
            return True
        except ffmpeg.Error:
            return False

使用示例

# 创建处理器实例
processor = FFmpegProcessor()

# 获取文件信息
info = processor.get_info('input.mp4')
print(f"时长: {processor.get_duration('input.mp4')} 秒")

# 转码
processor.transcode('input.mp4', 'output.mp4', crf=23, preset='slow')

# 缩放
processor.scale('input.mp4', 'output_720p.mp4', 1280, 720)

# 提取音频
processor.extract_audio('input.mp4', 'audio.mp3')

# 添加水印
processor.add_watermark('input.mp4', 'watermark.png', 'output_watermark.mp4')

# 创建缩略图
processor.create_thumbnail('input.mp4', 'thumbnail.jpg')

# 拼接视频
processor.concat(['part1.mp4', 'part2.mp4', 'part3.mp4'], 'output.mp4')

批量处理工具

批量转码工具

import ffmpeg
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Tuple
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class BatchTranscoder:
    """批量转码器"""
    
    def __init__(
        self,
        input_dir: str,
        output_dir: str,
        max_workers: int = 4,
        vcodec: str = 'libx264',
        acodec: str = 'aac',
        crf: int = 23,
        preset: str = 'medium'
    ):
        self.input_dir = input_dir
        self.output_dir = output_dir
        self.max_workers = max_workers
        self.vcodec = vcodec
        self.acodec = acodec
        self.crf = crf
        self.preset = preset
        
        os.makedirs(output_dir, exist_ok=True)
    
    def _process_file(self, input_file: str) -> Tuple[str, bool, str]:
        """处理单个文件"""
        filename = os.path.basename(input_file)
        output_file = os.path.join(self.output_dir, filename)
        
        try:
            ffmpeg.input(input_file).output(
                output_file,
                vcodec=self.vcodec,
                acodec=self.acodec,
                crf=self.crf,
                preset=self.preset
            ).overwrite_output().run(quiet=True)
            
            return filename, True, "成功"
        except ffmpeg.Error as e:
            return filename, False, str(e.stderr.decode())
    
    def process(self) -> List[Tuple[str, bool, str]]:
        """批量处理"""
        # 获取文件列表
        files = [
            os.path.join(self.input_dir, f)
            for f in os.listdir(self.input_dir)
            if f.endswith(('.mp4', '.avi', '.mkv', '.mov'))
        ]
        
        logger.info(f"找到 {len(files)} 个文件")
        
        results = []
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {
                executor.submit(self._process_file, f): f
                for f in files
            }
            
            for future in as_completed(futures):
                filename, success, message = future.result()
                if success:
                    logger.info(f"✓ {filename}")
                else:
                    logger.error(f"✗ {filename}: {message}")
                results.append((filename, success, message))
        
        # 统计
        success_count = sum(1 for _, success, _ in results if success)
        logger.info(f"完成: {success_count}/{len(results)} 成功")
        
        return results

# 使用示例
if __name__ == '__main__':
    transcoder = BatchTranscoder(
        input_dir='./input',
        output_dir='./output',
        max_workers=4,
        crf=23,
        preset='medium'
    )
    
    results = transcoder.process()

多分辨率批量工具

import ffmpeg
import os
from concurrent.futures import ThreadPoolExecutor

class MultiResolutionBatch:
    """多分辨率批量处理"""
    
    def __init__(self, input_dir: str, output_dir: str, max_workers: int = 4):
        self.input_dir = input_dir
        self.output_dir = output_dir
        self.max_workers = max_workers
        
        self.resolutions = {
            '1080p': (1920, 1080),
            '720p': (1280, 720),
            '480p': (854, 480),
        }
        
        for res in self.resolutions:
            os.makedirs(os.path.join(output_dir, res), exist_ok=True)
    
    def _process_file(self, input_file: str, resolution: str, width: int, height: int):
        """处理单个文件"""
        filename = os.path.basename(input_file)
        output_file = os.path.join(self.output_dir, resolution, filename)
        
        try:
            stream = ffmpeg.input(input_file)
            stream = ffmpeg.filter(stream, 'scale', width, height)
            stream = ffmpeg.output(stream, output_file,
                vcodec='libx264',
                acodec='aac',
                crf=23,
                preset='fast')
            stream = ffmpeg.overwrite_output(stream)
            ffmpeg.run(stream, quiet=True)
            return True
        except ffmpeg.Error:
            return False
    
    def process(self):
        """批量处理"""
        files = [
            os.path.join(self.input_dir, f)
            for f in os.listdir(self.input_dir)
            if f.endswith(('.mp4', '.avi', '.mkv'))
        ]
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = []
            for file in files:
                for resolution, (width, height) in self.resolutions.items():
                    futures.append(
                        executor.submit(self._process_file, file, resolution, width, height)
                    )
            
            for future in futures:
                future.result()

命令行工具封装

CLI 工具

#!/usr/bin/env python3
"""FFmpeg 命令行工具"""

import argparse
import ffmpeg
import sys
import os

def transcode(args):
    """转码命令"""
    try:
        stream = ffmpeg.input(args.input)
        stream = ffmpeg.output(stream, args.output,
            vcodec=args.vcodec,
            acodec=args.acodec,
            crf=args.crf,
            preset=args.preset)
        stream = ffmpeg.overwrite_output(stream)
        ffmpeg.run(stream)
        print(f"转码完成: {args.output}")
    except ffmpeg.Error as e:
        print(f"转码失败: {e.stderr.decode()}", file=sys.stderr)
        sys.exit(1)

def scale(args):
    """缩放命令"""
    try:
        stream = ffmpeg.input(args.input)
        stream = ffmpeg.filter(stream, 'scale', args.width, args.height)
        stream = ffmpeg.output(stream, args.output,
            vcodec='libx264',
            acodec='aac',
            crf=23)
        stream = ffmpeg.overwrite_output(stream)
        ffmpeg.run(stream)
        print(f"缩放完成: {args.output}")
    except ffmpeg.Error as e:
        print(f"缩放失败: {e.stderr.decode()}", file=sys.stderr)
        sys.exit(1)

def extract_audio(args):
    """提取音频命令"""
    try:
        stream = ffmpeg.input(args.input)
        stream = ffmpeg.output(stream, args.output,
            vn=None,
            acodec=args.acodec,
            audio_bitrate=args.bitrate)
        stream = ffmpeg.overwrite_output(stream)
        ffmpeg.run(stream)
        print(f"音频提取完成: {args.output}")
    except ffmpeg.Error as e:
        print(f"音频提取失败: {e.stderr.decode()}", file=sys.stderr)
        sys.exit(1)

def info(args):
    """信息命令"""
    try:
        probe = ffmpeg.probe(args.input)
        
        print(f"文件: {args.input}")
        print(f"格式: {probe['format']['format_name']}")
        print(f"时长: {float(probe['format']['duration']):.2f} 秒")
        print(f"大小: {int(probe['format']['size']) / 1024 / 1024:.2f} MB")
        
        for stream in probe['streams']:
            if stream['codec_type'] == 'video':
                print(f"\n视频流:")
                print(f"  编解码器: {stream['codec_name']}")
                print(f"  分辨率: {stream['width']}x{stream['height']}")
                print(f"  帧率: {stream.get('r_frame_rate', 'N/A')}")
            elif stream['codec_type'] == 'audio':
                print(f"\n音频流:")
                print(f"  编解码器: {stream['codec_name']}")
                print(f"  采样率: {stream.get('sample_rate', 'N/A')}")
                print(f"  声道数: {stream.get('channels', 'N/A')}")
    except ffmpeg.Error as e:
        print(f"获取信息失败: {e.stderr.decode()}", file=sys.stderr)
        sys.exit(1)

def main():
    parser = argparse.ArgumentParser(description='FFmpeg 工具')
    subparsers = parser.add_subparsers(dest='command', help='子命令')
    
    # 转码命令
    transcode_parser = subparsers.add_parser('transcode', help='转码')
    transcode_parser.add_argument('input', help='输入文件')
    transcode_parser.add_argument('output', help='输出文件')
    transcode_parser.add_argument('--vcodec', default='libx264', help='视频编解码器')
    transcode_parser.add_argument('--acodec', default='aac', help='音频编解码器')
    transcode_parser.add_argument('--crf', type=int, default=23, help='CRF 值')
    transcode_parser.add_argument('--preset', default='medium', help='编码预设')
    
    # 缩放命令
    scale_parser = subparsers.add_parser('scale', help='缩放')
    scale_parser.add_argument('input', help='输入文件')
    scale_parser.add_argument('output', help='输出文件')
    scale_parser.add_argument('--width', type=int, required=True, help='宽度')
    scale_parser.add_argument('--height', type=int, required=True, help='高度')
    
    # 提取音频命令
    audio_parser = subparsers.add_parser('extract-audio', help='提取音频')
    audio_parser.add_argument('input', help='输入文件')
    audio_parser.add_argument('output', help='输出文件')
    audio_parser.add_argument('--acodec', default='libmp3lame', help='音频编解码器')
    audio_parser.add_argument('--bitrate', default='192k', help='音频码率')
    
    # 信息命令
    info_parser = subparsers.add_parser('info', help='获取信息')
    info_parser.add_argument('input', help='输入文件')
    
    args = parser.parse_args()
    
    if args.command == 'transcode':
        transcode(args)
    elif args.command == 'scale':
        scale(args)
    elif args.command == 'extract-audio':
        extract_audio(args)
    elif args.command == 'info':
        info(args)
    else:
        parser.print_help()

if __name__ == '__main__':
    main()

使用示例

# 转码
python ffmpeg_tool.py transcode input.mp4 output.mp4 --crf 23 --preset slow

# 缩放
python ffmpeg_tool.py scale input.mp4 output.mp4 --width 1280 --height 720

# 提取音频
python ffmpeg_tool.py extract-audio input.mp4 audio.mp3 --bitrate 320k

# 获取信息
python ffmpeg_tool.py info input.mp4

注意事项

  1. 错误处理:始终捕获 ffmpeg.Error 异常
  2. 资源管理:长时间运行的进程注意资源释放
  3. 并发控制:并行处理时控制线程/进程数量
  4. 日志记录:记录处理日志便于调试
  5. 版本兼容:注意 ffmpeg-python 版本与 FFmpeg 版本的兼容性

业务场景

场景 1:视频处理 API

from flask import Flask, request, jsonify
import ffmpeg
import os
import uuid

app = Flask(__name__)

@app.route('/api/transcode', methods=['POST'])
def transcode():
    """转码 API"""
    if 'file' not in request.files:
        return jsonify({'error': '没有文件'}), 400
    
    file = request.files['file']
    filename = f"{uuid.uuid4()}.mp4"
    input_path = os.path.join('/tmp', filename)
    output_path = os.path.join('/tmp', f"output_{filename}")
    
    file.save(input_path)
    
    try:
        ffmpeg.input(input_path).output(
            output_path,
            vcodec='libx264',
            acodec='aac',
            crf=23
        ).overwrite_output().run(quiet=True)
        
        return jsonify({
            'success': True,
            'output': output_path
        })
    except ffmpeg.Error as e:
        return jsonify({'error': str(e.stderr.decode())}), 500

@app.route('/api/info', methods=['POST'])
def info():
    """获取信息 API"""
    if 'file' not in request.files:
        return jsonify({'error': '没有文件'}), 400
    
    file = request.files['file']
    filename = f"{uuid.uuid4()}.mp4"
    input_path = os.path.join('/tmp', filename)
    
    file.save(input_path)
    
    try:
        probe = ffmpeg.probe(input_path)
        return jsonify({
            'success': True,
            'info': probe
        })
    except ffmpeg.Error as e:
        return jsonify({'error': str(e.stderr.decode())}), 500

场景 2:视频处理队列

import queue
import threading
import ffmpeg

class VideoQueue:
    """视频处理队列"""
    
    def __init__(self, max_workers: int = 4):
        self.queue = queue.Queue()
        self.max_workers = max_workers
        self.workers = []
        self.running = False
    
    def start(self):
        """启动队列"""
        self.running = True
        for _ in range(self.max_workers):
            worker = threading.Thread(target=self._worker)
            worker.start()
            self.workers.append(worker)
    
    def stop(self):
        """停止队列"""
        self.running = False
        for worker in self.workers:
            worker.join()
    
    def add_task(self, input_file: str, output_file: str, **kwargs):
        """添加任务"""
        self.queue.put((input_file, output_file, kwargs))
    
    def _worker(self):
        """工作线程"""
        while self.running:
            try:
                input_file, output_file, kwargs = self.queue.get(timeout=1)
                self._process_file(input_file, output_file, **kwargs)
                self.queue.task_done()
            except queue.Empty:
                continue
    
    def _process_file(self, input_file: str, output_file: str, **kwargs):
        """处理文件"""
        try:
            ffmpeg.input(input_file).output(
                output_file, **kwargs
            ).overwrite_output().run(quiet=True)
            print(f"完成: {output_file}")
        except ffmpeg.Error as e:
            print(f"失败: {output_file} - {e.stderr.decode()}")

# 使用示例
queue = VideoQueue(max_workers=4)
queue.start()

queue.add_task('input1.mp4', 'output1.mp4', vcodec='libx264', crf=23)
queue.add_task('input2.mp4', 'output2.mp4', vcodec='libx264', crf=23)

queue.queue.join()
queue.stop()

扩展阅读

  1. FFmpeg-Python 文档
  2. FFmpeg 官方文档
  3. Python subprocess 文档
  4. Python 并发编程

总结

本章介绍了 FFmpeg 的 Python 集成,包括:

  • FFmpeg-Python 库的使用
  • 子进程调用方法
  • 复杂滤镜图构建
  • 批量处理工具
  • CLI 工具封装

掌握 Python 集成可以帮助您构建自动化的视频处理系统。