Skip to content

Commit 6ce2558

Browse files
committed
py35 compat ffprobe
py35 ffplay black, py35 throughout vers
1 parent 04bf03c commit 6ce2558

File tree

15 files changed

+302
-192
lines changed

15 files changed

+302
-192
lines changed

.travis.yml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ git:
99
python:
1010
- 3.8-dev
1111
- 3.7
12+
- 3.6
13+
- 3.5
1214

1315
addons:
1416
apt:
@@ -18,12 +20,17 @@ addons:
1820
matrix:
1921
include:
2022
- os: linux
23+
name: PEP8 MyPy
2124
python: 3.7
22-
install: pip install -e .[tests]
25+
install: pip install -e .[tests,cov]
2326
script:
2427
- flake8
2528
- mypy .
29+
- pytest -r a -v
30+
after_success:
31+
- pytest --cov
32+
- coveralls
2633

2734
install: pip install -e .[tests]
2835

29-
script: pytest -v
36+
script: pytest -r a -v

asyncioffmpeg/ffplay.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,18 @@
33
This example uses a finite number of workers, rather than slamming the system with endless subprocesses.
44
This is more effective than endless context switching for an overloaded CPU.
55
"""
6+
import queue
7+
import subprocess
68
import asyncio
79
from pathlib import Path
810
import shutil
911
import sys
1012
from typing import Iterable
1113
import os
1214

13-
FFPLAY = shutil.which('ffplay')
15+
FFPLAY = shutil.which("ffplay")
1416
if not FFPLAY:
15-
raise FileNotFoundError('FFPLAY not found')
17+
raise ImportError("FFPLAY not found")
1618

1719

1820
async def ffplay(queue: asyncio.Queue):
@@ -25,14 +27,14 @@ async def ffplay(queue: asyncio.Queue):
2527
while True:
2628
filein = await queue.get()
2729

28-
cmd = [FFPLAY, '-loglevel', 'warning', '-autoexit', str(filein)]
30+
cmd = [FFPLAY, "-loglevel", "warning", "-autoexit", str(filein)]
2931

3032
proc = await asyncio.create_subprocess_exec(*cmd)
3133

3234
ret = await proc.wait()
3335

3436
if ret != 0:
35-
print(filein, 'playback failure', file=sys.stderr)
37+
print(filein, "playback failure", file=sys.stderr)
3638

3739
queue.task_done()
3840

@@ -42,18 +44,39 @@ async def main(flist: Iterable[Path]):
4244
Ntask = os.cpu_count() # includes logical cores
4345
if not isinstance(Ntask, int):
4446
Ntask = 2
45-
# %% setup queue
47+
# %% setup queue
4648
queue = asyncio.Queue() # type: ignore
4749

4850
for f in flist:
4951
await queue.put(f)
50-
# %% setup Tasks
51-
tasks = [asyncio.create_task(ffplay(queue)) for i in range(Ntask)]
52+
# %% setup Tasks
53+
if sys.version_info >= (3, 7):
54+
tasks = [asyncio.create_task(ffplay(queue)) for i in range(Ntask)]
55+
else:
56+
tasks = [asyncio.ensure_future(ffplay(queue)) for i in range(Ntask)]
5257

5358
await queue.join()
5459

55-
# %% program done, teardown Tasks
60+
# %% program done, teardown Tasks
5661
for task in tasks:
5762
task.cancel()
5863

5964
await asyncio.gather(*tasks, return_exceptions=True)
65+
66+
67+
def ffplay_sync(qin: queue.Queue):
68+
"""
69+
Play media synchronously
70+
"""
71+
72+
while not qin.empty():
73+
filein = qin.get(timeout=1.0)
74+
75+
cmd = [FFPLAY, "-v", "warning", "-autoexit", str(filein)]
76+
77+
ret = subprocess.run(cmd)
78+
79+
if ret.returncode != 0:
80+
print(filein, "playback failure", cmd, file=sys.stderr)
81+
82+
qin.task_done()

asyncioffmpeg/ffplay_sync.py

Lines changed: 0 additions & 29 deletions
This file was deleted.

asyncioffmpeg/ffprobe.py

Lines changed: 66 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,50 +6,88 @@
66
import asyncio
77
import json
88
import subprocess
9-
from typing import Dict, Iterable, AsyncGenerator
9+
import typing
1010
from pathlib import Path
1111
import shutil
1212

13-
FFPROBE = shutil.which('ffprobe')
13+
FFPROBE = shutil.which("ffprobe")
1414
if not FFPROBE:
15-
raise FileNotFoundError('FFPROBE not found')
15+
raise ImportError("FFPROBE not found")
1616

1717

18-
async def get_meta(files: Iterable[Path]) -> AsyncGenerator[dict, None]: # mypy bug Dict[str, str]
18+
def files2futures(path: Path, suffix: str) -> list:
19+
path = Path(path).expanduser()
20+
if not path.is_dir():
21+
raise NotADirectoryError(path)
22+
return [ffprobe(f) for f in path.iterdir() if f.is_file() and f.suffix in suffix]
1923

20-
futures = [ffprobe(file) for file in files]
21-
for future in asyncio.as_completed(futures):
22-
try:
23-
meta = await future
24-
except asyncio.TimeoutError:
25-
continue
2624

27-
yield meta
25+
def print_meta(meta: typing.Dict[str, typing.Any]):
26+
fn = Path(meta["format"]["filename"])
27+
dur = float(meta["streams"][0]["duration"])
28+
print("{:>40} {:>5.1f}".format(fn.name, dur))
2829

2930

30-
async def ffprobe(file: Path) -> Dict[str, str]:
31-
""" get media metadata """
31+
async def get_meta_gather(
32+
path: Path, suffix: str
33+
) -> typing.List[typing.Dict[str, typing.Any]]:
34+
""" for comparison with asyncio.as_completed"""
35+
futures = files2futures(path, suffix)
36+
metas = await asyncio.gather(*futures)
37+
for meta in metas:
38+
print_meta(meta)
3239

33-
proc = await asyncio.create_subprocess_exec(*[FFPROBE, '-loglevel', 'warning',
34-
'-print_format', 'json',
35-
'-show_streams',
36-
'-show_format',
37-
str(file)],
38-
stdout=asyncio.subprocess.PIPE)
40+
return metas
3941

40-
stdout, _ = await proc.communicate()
4142

42-
return json.loads(stdout.decode('utf8'))
43+
async def get_meta(
44+
path: Path, suffix: str
45+
) -> typing.List[typing.Dict[str, typing.Any]]:
46+
futures = files2futures(path, suffix)
47+
metas = []
48+
for file in asyncio.as_completed(futures):
49+
meta = await file
50+
print_meta(meta)
51+
metas.append(meta)
4352

53+
return metas
4454

45-
def ffprobe_sync(file: Path) -> dict: # mypy bug Dict[str, str]
55+
56+
async def ffprobe(file: Path) -> typing.Dict[str, typing.Any]:
4657
""" get media metadata """
58+
proc = await asyncio.create_subprocess_exec(
59+
*[
60+
FFPROBE,
61+
"-loglevel",
62+
"warning",
63+
"-print_format",
64+
"json",
65+
"-show_streams",
66+
"-show_format",
67+
str(file),
68+
],
69+
stdout=asyncio.subprocess.PIPE
70+
)
71+
72+
stdout, _ = await proc.communicate()
73+
74+
return json.loads(stdout.decode("utf8"))
4775

48-
meta = subprocess.check_output([FFPROBE, '-v', 'warning',
49-
'-print_format', 'json',
50-
'-show_streams',
51-
'-show_format',
52-
str(file)],
53-
text=True)
76+
77+
def ffprobe_sync(file: Path) -> typing.Dict[str, typing.Any]:
78+
""" get media metadata """
79+
meta = subprocess.check_output(
80+
[
81+
FFPROBE,
82+
"-v",
83+
"warning",
84+
"-print_format",
85+
"json",
86+
"-show_streams",
87+
"-show_format",
88+
str(file),
89+
],
90+
universal_newlines=True,
91+
)
5492

5593
return json.loads(meta)

asyncioffmpeg/runner.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import os
2+
import sys
3+
import asyncio
4+
5+
6+
def runner(fun, *args):
7+
"""
8+
Generic asyncio.run() equivalent for Python >= 3.5
9+
"""
10+
if os.name == "nt" and (3, 7) <= sys.version_info < (3, 8):
11+
asyncio.set_event_loop_policy(
12+
asyncio.WindowsProactorEventLoopPolicy() # type: ignore
13+
)
14+
15+
if sys.version_info >= (3, 7):
16+
result = asyncio.run(fun(*args))
17+
else:
18+
if os.name == "nt":
19+
loop = asyncio.ProactorEventLoop()
20+
else:
21+
loop = asyncio.new_event_loop()
22+
asyncio.get_child_watcher().attach_loop(loop)
23+
result = loop.run_until_complete(fun(*args))
24+
loop.close()
25+
26+
return result

demo.py

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,26 @@
3333
import asyncio
3434
import sys
3535
from argparse import ArgumentParser
36-
37-
if sys.version_info < (3, 6):
38-
raise RuntimeError('Python >= 3.6 required')
36+
from asyncioffmpeg.runner import runner
3937

4038

4139
async def coro_worker(i: int, Niter: int, tic: float):
4240
"""coroutine worker"""
4341
for _ in range(Niter):
4442
math.sin(3)
4543

46-
print(f'Coroutine worker {i} done at {time.time()-tic:.2f} sec.')
44+
print("Coroutine worker {} done at {:.2f} sec.".format(i, time.monotonic() - tic))
4745

4846

4947
async def coro(Nworker: int, Niter: int, tic: float):
50-
tasks = [asyncio.create_task(coro_worker(i, Niter, tic)) for i in range(Nworker)]
48+
if sys.version_info >= (3, 7):
49+
tasks = [
50+
asyncio.create_task(coro_worker(i, Niter, tic)) for i in range(Nworker)
51+
]
52+
else:
53+
tasks = [
54+
asyncio.ensure_future(coro_worker(i, Niter, tic)) for i in range(Nworker)
55+
]
5156
await asyncio.wait(tasks)
5257

5358

@@ -60,46 +65,59 @@ def __init__(self, i: int, Niter: int):
6065
self.i = i
6166

6267
def run(self):
63-
tic = time.time()
68+
tic = time.monotonic()
6469
for _ in range(self.Niter):
6570
math.sin(3)
6671

67-
print(f'Thread worker {self.i} done at {time.time()-tic:.2f} sec.')
72+
print(
73+
"Thread worker {} done at {:.2f} sec.".format(
74+
self.i, time.monotonic() - tic
75+
)
76+
)
6877

6978

7079
def mp_worker(i: int, Niter: int, tic: float):
7180
""" multiprocessing worker"""
7281
for _ in range(Niter):
7382
math.sin(3)
7483

75-
print(f'Process worker {i} done at {time.time()-tic:.2f} sec.')
76-
77-
78-
if __name__ == '__main__':
79-
P = ArgumentParser(description='Demonstrate differences between coroutines, threads and proceses.')
80-
P.add_argument('method', help='c: coroutine, t: threading, p: multiprocessing')
81-
P.add_argument('-Nworker', help='number of workers', type=int, default=4)
82-
P.add_argument('-Niter', help='number of loop iterations (arbitrary)', type=int, default=5000000)
84+
print("Process worker {} done at {:.2f} sec.".format(i, time.monotonic() - tic))
85+
86+
87+
if __name__ == "__main__":
88+
P = ArgumentParser(
89+
description="Demonstrate differences between coroutines, threads and proceses."
90+
)
91+
P.add_argument("method", help="c: coroutine, t: threading, p: multiprocessing")
92+
P.add_argument("-Nworker", help="number of workers", type=int, default=4)
93+
P.add_argument(
94+
"-Niter",
95+
help="number of loop iterations (arbitrary)",
96+
type=int,
97+
default=5000000,
98+
)
8399
A = P.parse_args()
84100

85-
if A.method not in ('c', 't', 'p'):
86-
raise ValueError('Method must be one of: c t p')
101+
if A.method not in ("c", "t", "p"):
102+
raise ValueError("Method must be one of: c t p")
87103

88-
tic = time.time()
104+
tic = time.monotonic()
89105
for i in range(A.Nworker):
90-
if A.method == 't':
91-
p = Thread_worker(i, A.Niter)
106+
if A.method == "t":
107+
p = Thread_worker(i, A.Niter) # type: ignore
92108
p.start()
93-
elif A.method == 'p':
94-
p = multiprocessing.Process(target=mp_worker, args=(i, A.Niter, tic)) # type: ignore
109+
elif A.method == "p":
110+
p = multiprocessing.Process( # type: ignore
111+
target=mp_worker, args=(i, A.Niter, tic) # type: ignore
112+
)
95113
p.start()
96-
print(f'started process workert {i}, PID: {p.pid}') # type: ignore
114+
print(
115+
"started process workert {}, PID: {}".format(i, p.pid) # type: ignore
116+
) # type: ignore
97117

98-
if A.method == 'c':
99-
if sys.version_info < (3, 7):
100-
raise RuntimeError('Python >= 3.7 required for this example')
101-
asyncio.run(coro(A.Nworker, A.Niter, tic))
118+
if A.method == "c":
119+
runner(coro, A.Nworker, A.Niter, tic)
102120
else:
103121
p.join()
104122

105-
print(f'{time.time()-tic:.2f} sec.')
123+
print("{:.2f} sec.".format(time.monotonic() - tic))

0 commit comments

Comments
 (0)