I have the following method to create a dummy video file:
def create_dummy_mp4_video() -> None:
cmd = (
f"ffmpeg -y " # rewrite if exists
f"-f lavfi -i color=size=100x100:rate=10:color=black " # blank video
f"-f lavfi -i anullsrc=channel_layout=stereo:sample_rate=44100 " # silent audio
f"-t 1 " # video duration, seconds
"output.mp4" # file name
)
proc = subprocess.run(
shlex.split(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False,
)
if proc.returncode != 0:
raise Exception()
@dataclass(frozen=True)
class FakeVideo:
body: bytes
width: int
height: int
fps: int
size: int
frames: int
length_s: int
def video() -> FakeVideo:
w, h, fps, sec, filename = 100, 100, 10, 1, "output.mp4"
create_dummy_mp4_video()
video_path = os.path.join(os.getcwd(), filename)
with open(video_path, "rb") as file:
body = file.read()
size = len(body)
frames = fps // sec
return FakeVideo(
body=body, width=w, height=h, fps=fps,
size=size, frames=frames, length_s=sec,
)
then I want to extract a frame at specific time, I did it like this:
async def run_shell_command(frame_millisecond, data: bytes) -> bytes:
async with aiofiles.tempfile.NamedTemporaryFile("wb") as file:
await file.write(data)
proc = await asyncio.create_subprocess_exec(
"ffmpeg",
"-i",
file.name,
"-ss",
f"{frame_millisecond}ms", # seek the position to the specific millisecond
"-vframes", "1", # only handle one video frame
"-c:v", "png", # select the output encoder
"-f", "image2pipe", "-", # force output file to stdout,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
level = logging.DEBUG if proc.returncode == 0 else logging.WARN
LOGGER.log(level, f"[cmd exited with {proc.returncode}]")
if stderr:
print(level, f"[stderr]{stderr.decode()}")
LOGGER.log(level, f"[stderr]{stderr.decode()}")
return stdout
async def runner():
v = video()
time = int(v.length_s / 2 * 1000)
res = await run_shell_command(time, v.body)
assert isinstance(res, bytes)
assert imghdr.what(h=res, file=None) == "png"
loop = asyncio.get_event_loop()
loop.run_until_complete(runner())
This code fails whit the following error:
/tmp/tmpzo786lfg: Invalid data found when processing input
Please help to find the problem with my code. During investigation I found that it works if I change the size of the video like that:
f"-f lavfi -i color=size=1280x720:rate=25:color=black " # blank video
but I want to be able to process any video.
I use ffmpg 4.3.3-0+deb11u1
It looks like you have to make sure the data is written to the temporary file, before executing FFmpeg.
I don't have any experience with asyncio and aiofiles and I am running Windows 10, so I am not sure about the Linux behavior...
I tried to add await file.flush() after file.write(data), but the FFmpeg execution result was "Permission denied".
I solved it using the solution from the following post:
Add delete=False argument to tempfile.NamedTemporaryFile:
async with aiofiles.tempfile.NamedTemporaryFile("wb", delete=False) as file:
Add await file.close() after await file.write(data).
Closing the file is used for making sure that all the data is written to the file, before executing FFmpeg.
Add os.unlink(file.name) before return stdout.
Complete code:
import subprocess
import asyncio
from dataclasses import dataclass
import shlex
import aiofiles
import os
import logging
import imghdr
def create_dummy_mp4_video() -> None:
cmd = (
f"ffmpeg -y " # rewrite if exists
f"-f lavfi -i color=size=100x100:rate=10:color=black " # blank video
f"-f lavfi -i anullsrc=channel_layout=stereo:sample_rate=44100 " # silent audio
f"-t 1 " # video duration, seconds
"output.mp4" # file name
)
proc = subprocess.run(
shlex.split(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL, #stderr=subprocess.PIPE,
shell=False,
)
if proc.returncode != 0:
raise Exception()
@dataclass(frozen=True)
class FakeVideo:
body: bytes
width: int
height: int
fps: int
size: int
frames: int
length_s: int
def video() -> FakeVideo:
w, h, fps, sec, filename = 100, 100, 10, 1, "output.mp4"
create_dummy_mp4_video()
video_path = os.path.join(os.getcwd(), filename)
with open(video_path, "rb") as file:
body = file.read()
size = len(body)
frames = fps // sec
return FakeVideo(
body=body, width=w, height=h, fps=fps,
size=size, frames=frames, length_s=sec,
)
async def run_shell_command(frame_millisecond, data: bytes) -> bytes:
# https://stackoverflow.com/questions/23212435/permission-denied-to-write-to-my-temporary-file/23212515
async with aiofiles.tempfile.NamedTemporaryFile("wb", delete=False) as file:
await file.write(data)
#await file.flush() # Flush data to file before executing FFmpeg ?
await file.close() # Close the file before executing FFmpeg.
proc = await asyncio.create_subprocess_exec(
"ffmpeg",
"-i",
file.name,
"-ss",
f"{frame_millisecond}ms", # seek the position to the specific millisecond
"-vframes", "1", # only handle one video frame
"-c:v", "png", # select the output encoder
"-f", "image2pipe", "-", # force output file to stdout,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
level = logging.DEBUG if proc.returncode == 0 else logging.WARN
#LOGGER.log(level, f"[cmd exited with {proc.returncode}]")
if stderr:
print(level, f"[stderr]{stderr.decode()}")
#LOGGER.log(level, f"[stderr]{stderr.decode()}")
os.unlink(file.name) # Unlink is required because delete=False was used
return stdout
async def runner():
v = video()
time = int(v.length_s / 2 * 1000)
res = await run_shell_command(time, v.body)
assert isinstance(res, bytes)
assert imghdr.what(h=res, file=None) == "png"
loop = asyncio.get_event_loop()
loop.run_until_complete(runner())
Notes:
LOGGER because I couldn't find the LOGGER module.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With