Skip to content

Commit 04bf03c

Browse files
committed
asyncgenerator
1 parent d318992 commit 04bf03c

File tree

10 files changed

+91
-91
lines changed

10 files changed

+91
-91
lines changed

README.md

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,38 @@ In general to show the latest techniques, Python ≥ 3.7 is required.
99

1010
## FFprobe
1111

12-
* `probe_sync`: Use FFprobe to retrieve file metadata synchronously
13-
* `probe_coroutine`: Use FFprobe to retrieve file metadata asynchronously using Python `asyncio` coroutine event loop.
12+
Both synchronous (traditional for loop) and asynchronous pipeline are demonstrated.
13+
They call FFprobe executable to return JSON formatted metadata.
14+
15+
### probe_sync.py
16+
17+
retrieve file metadata synchronously.
18+
19+
### probe_coroutine.py
20+
21+
retrieve file metadata in an asynchronous pipeline (asyncio generator) using Python `asyncio` coroutine event loop.
1422

1523
## FFplay
1624

1725
I like to test asynchronous techniques with video playback, as it makes some effects obvious.
26+
The FFplay asyncio example is more advanced than the FFprobe example.
27+
In the FFprobe example, the lazy asyncio generator produces metadata concurrently as fast as it's requested.
28+
There is no resource throttling in the FFprobe example, so the CPU could become overwhelmed with context switching.
29+
30+
The FFplay example in contrast is an example of a task using resource throttling via asyncio.Queue.
31+
The queueing could also be implemented for FFprobe style task if desired.
32+
However, the rationale employed is that the FFprobe task is overall lightweight, and thus other parts of the pipeline inherently limit resource utilization.
33+
If the FFprobe task was in an asyncio.gather() algorithm, resource utilization could get too high.
34+
Thus we have a "win-win" by using asyncio generator for FFprobe--the throttling comes implicitly from other parts of the pipeline.
35+
36+
37+
### play_threadpool.py
38+
39+
Even though coroutines are more efficient in many applications, the syntax of `concurrent.futures.ThreadPoolExecutor` is perhaps the simplest possible way to spawn independent processes in a controlled fashion
40+
41+
### play_coroutine.py
1842

19-
* `play_threadpool`: Even though coroutines are more efficient, the syntax of `concurrent.futures.ThreadPoolExecutor` is perhaps the simplest possible way to spawn independent processes in a controlled fashion
20-
* `play_coroutine`: Use Python `asyncio` coroutine event loop to spawn processes.
43+
Use Python `asyncio` coroutine event loop to spawn processes.
2144

2245
### Fortran
2346

asyncioffmpeg/ffplay.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,9 @@
77
from pathlib import Path
88
import shutil
99
import sys
10-
from typing import List, Generator, Union
10+
from typing import Iterable
1111
import os
1212

13-
if sys.version_info < (3, 7):
14-
raise RuntimeError('Python >= 3.7 required')
15-
16-
1713
FFPLAY = shutil.which('ffplay')
1814
if not FFPLAY:
1915
raise FileNotFoundError('FFPLAY not found')
@@ -28,9 +24,8 @@ async def ffplay(queue: asyncio.Queue):
2824

2925
while True:
3026
filein = await queue.get()
31-
assert isinstance(filein, Path)
3227

33-
cmd = [FFPLAY, '-v', 'warning', '-autoexit', str(filein)]
28+
cmd = [FFPLAY, '-loglevel', 'warning', '-autoexit', str(filein)]
3429

3530
proc = await asyncio.create_subprocess_exec(*cmd)
3631

@@ -42,7 +37,7 @@ async def ffplay(queue: asyncio.Queue):
4237
queue.task_done()
4338

4439

45-
async def main(flist: Union[List[Path], Generator[Path, None, None]]):
40+
async def main(flist: Iterable[Path]):
4641

4742
Ntask = os.cpu_count() # includes logical cores
4843
if not isinstance(Ntask, int):

asyncioffmpeg/ffprobe.py

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,55 @@
11
#!/usr/bin/env python
22
"""
3-
This file contains connections from Python to FFprobe, useful for extracting
3+
use Python with FFprobe to extract
44
JSON metadata from any kind of media file that FFprobe can read.
5-
6-
Moreover, this file demonstrates asynchronous and synchronous access to
7-
thread-safe subprocesses
85
"""
96
import asyncio
107
import json
8+
import subprocess
9+
from typing import Dict, Iterable, AsyncGenerator
1110
from pathlib import Path
1211
import shutil
13-
import sys
1412

1513
FFPROBE = shutil.which('ffprobe')
1614
if not FFPROBE:
1715
raise FileNotFoundError('FFPROBE not found')
1816

19-
TIMEOUT = 5.0 # 2.0 is too short for Windows
17+
18+
async def get_meta(files: Iterable[Path]) -> AsyncGenerator[dict, None]: # mypy bug Dict[str, str]
19+
20+
futures = [ffprobe(file) for file in files]
21+
for future in asyncio.as_completed(futures):
22+
try:
23+
meta = await future
24+
except asyncio.TimeoutError:
25+
continue
26+
27+
yield meta
2028

2129

22-
# %% Asynchronous FFprobe
23-
async def ffprobe(filein: Path) -> dict:
30+
async def ffprobe(file: Path) -> Dict[str, str]:
2431
""" get media metadata """
25-
assert isinstance(FFPROBE, str)
2632

27-
proc = await asyncio.create_subprocess_exec(*[FFPROBE, '-v', 'warning',
33+
proc = await asyncio.create_subprocess_exec(*[FFPROBE, '-loglevel', 'warning',
2834
'-print_format', 'json',
2935
'-show_streams',
3036
'-show_format',
31-
str(filein)],
32-
stdout=asyncio.subprocess.PIPE,
33-
stderr=asyncio.subprocess.PIPE)
37+
str(file)],
38+
stdout=asyncio.subprocess.PIPE)
3439

35-
stdout, stderr = await proc.communicate()
40+
stdout, _ = await proc.communicate()
3641

3742
return json.loads(stdout.decode('utf8'))
3843

3944

40-
async def get_meta(filein: Path) -> dict:
41-
try:
42-
meta = await asyncio.wait_for(ffprobe(filein), timeout=TIMEOUT)
43-
return meta
44-
except asyncio.TimeoutError:
45-
print('timeout ', filein, file=sys.stderr)
46-
return {}
45+
def ffprobe_sync(file: Path) -> dict: # mypy bug Dict[str, str]
46+
""" get media metadata """
47+
48+
meta = subprocess.check_output([FFPROBE, '-v', 'warning',
49+
'-print_format', 'json',
50+
'-show_streams',
51+
'-show_format',
52+
str(file)],
53+
text=True)
54+
55+
return json.loads(meta)

asyncioffmpeg/ffprobe_sync.py

Lines changed: 0 additions & 28 deletions
This file was deleted.

mypy.ini

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
[mypy]
22
ignore_missing_imports = True
33
strict_optional = False
4-
allow_redefinition = True
4+
allow_redefinition = True
5+
show_error_context = False
6+
show_column_numbers = True

play_coroutine.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#!/usr/bin/env python
22
import asyncio
33
import os
4+
import sys
45
from pathlib import Path
56
from argparse import ArgumentParser
67

@@ -12,15 +13,16 @@
1213
description="Plays media files asynchronously with FFplay")
1314
p.add_argument('path', help='directory where media files are kept')
1415
p.add_argument('-suffix', help='file suffixes of desired media file types',
15-
nargs='+', default=['.mp4', '.avi', '.ogv', '.wmv', '.flv', '.mov'])
16+
nargs='+', default=['.mp4', '.avi', '.ogv', '.wmv', '.flv', '.mov', '.ogg'])
1617
P = p.parse_args()
1718

1819
path = Path(P.path).expanduser()
20+
if not path.is_dir():
21+
raise NotADirectoryError(path)
1922

2023
flist = (f for f in path.iterdir() if f.is_file() and f.suffix in P.suffix)
2124

22-
if os.name == 'nt':
23-
loop = asyncio.ProactorEventLoop() # type: ignore
24-
loop.run_until_complete(play.main(flist))
25-
else:
26-
asyncio.run(play.main(flist))
25+
if os.name == 'nt' and sys.version_info < (3, 8):
26+
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) # type: ignore
27+
28+
asyncio.run(play.main(flist))

probe_coroutine.py

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,23 @@
11
#!/usr/bin/env python
22
from argparse import ArgumentParser
33
from pathlib import Path
4-
from typing import List
4+
from typing import Sequence
55
import asyncio
6+
import sys
67
import os
78

89
import asyncioffmpeg.ffprobe as probe
910

1011

11-
async def main(path: Path, suffix: List[str]):
12+
async def main(path: Path, suffix: Sequence[str]):
1213

13-
path = Path(P.path).expanduser()
14+
path = Path(path).expanduser()
15+
if not path.is_dir():
16+
raise NotADirectoryError(path)
1417

15-
flist = (f for f in path.iterdir() if f.is_file() and f.suffix in suffix)
18+
files = (f for f in path.iterdir() if f.is_file() and f.suffix in suffix)
1619

17-
futures = [probe.get_meta(f) for f in flist]
18-
19-
metas = await asyncio.gather(*futures)
20-
21-
for meta in metas:
20+
async for meta in probe.get_meta(files):
2221
if not meta:
2322
continue
2423
fn = meta['format']['filename']
@@ -31,11 +30,10 @@ async def main(path: Path, suffix: List[str]):
3130
description="Get media metadata asynchronously with FFprobe")
3231
p.add_argument('path', help='directory where media files are kept')
3332
p.add_argument('-suffix', help='file suffixes of desired media file types',
34-
nargs='+', default=['.mp4', '.avi', '.ogv'])
33+
nargs='+', default=['.mp4', '.avi', '.ogv', '.ogg'])
3534
P = p.parse_args()
3635

37-
if os.name == 'nt':
38-
loop = asyncio.ProactorEventLoop() # type: ignore
39-
loop.run_until_complete(main(P.path, P.suffix))
40-
else:
41-
asyncio.run(main(P.path, P.suffix))
36+
if os.name == 'nt' and sys.version_info < (3, 8):
37+
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) # type: ignore
38+
39+
asyncio.run(main(P.path, P.suffix))

probe_sync.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from pathlib import Path
33
import shutil
44
from argparse import ArgumentParser
5-
import asyncioffmpeg.ffprobe_sync as probe
5+
import asyncioffmpeg.ffprobe as probe
66

77
FFPROBE = shutil.which('ffprobe')
88
if not FFPROBE:
@@ -14,7 +14,7 @@
1414
description="Get media metadata synchronously with FFprobe")
1515
p.add_argument('path', help='directory where media files are kept')
1616
p.add_argument('-suffix', help='file suffixes of desired media file types',
17-
nargs='+', default=['.mp4', '.avi', '.ogv'])
17+
nargs='+', default=['.mp4', '.avi', '.ogv', '.ogg'])
1818
P = p.parse_args()
1919

2020
path = Path(P.path).expanduser()

setup.cfg

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ classifiers =
2020
Topic :: Multimedia :: Video :: Conversion
2121
Topic :: System :: Networking
2222
Topic :: Utilities
23-
license_file = LICENSE
23+
license_files =
24+
LICENSE.txt
2425
long_description = file: README.md
2526
long_description_content_type = text/markdown
2627

@@ -29,7 +30,6 @@ python_requires = >= 3.7
2930
setup_requires =
3031
setuptools >= 38.6
3132
pip >= 10
32-
twine >= 1.11
3333
packages = find:
3434
scripts =
3535
demo.py

tests/test_ffprobe.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import os
33
import pytest
44
import asyncioffmpeg.ffprobe as probe
5-
import asyncioffmpeg.ffprobe_sync as probe_sync
65

76

87
def get_duration(meta: dict) -> float:
@@ -14,14 +13,14 @@ def get_duration(meta: dict) -> float:
1413
async def test_ffprobe(genpat):
1514
vid = genpat
1615

17-
meta = await probe.get_meta(vid)
18-
assert get_duration(meta) == pytest.approx(5.)
16+
async for meta in probe.get_meta([vid]):
17+
assert get_duration(meta) == pytest.approx(5.)
1918

2019

2120
def test_ffprobe_sync(genpat):
2221
vid = genpat
2322

24-
meta = probe_sync.ffprobe_sync(vid)
23+
meta = probe.ffprobe_sync(vid)
2524
assert get_duration(meta) == pytest.approx(5.)
2625

2726

0 commit comments

Comments
 (0)