Video Encoded Frame Type¶
This example showcases how to compress and print the sequence of encoded frame types (I, P, B) encountered during the encoding process.
Example script output¶
~/depthai-python/examples$ python3 video_encoding.py
Press Ctrl+C to stop encoding...
To view the encoded data, convert the stream file (.h265) into a video file (.mp4) using a command below:
ffmpeg -framerate 30 -i video.h265 -c copy video.mp4
1I,29P,1I,29P,1I,29P,1I,29P,1I,29P,1I,29P,1I,29P,1I,29P,1I,29P,1I,18P
This output indicates the script’s real-time video encoding process, and it compresses the sequence of frame types encountered. After stopping the script, it provides instructions to convert the raw H.265 stream into a playable MP4 video file. The final line shows a compressed representation of the frame types sequence, indicating the count of consecutive P, B, and I frames.
Setup¶
Please run the install script to download all required dependencies. Please note that this script must be ran from git context, so you have to download the depthai-python repository first and then run the script
git clone https://github.com/luxonis/depthai-python.git
cd depthai-python/examples
python3 install_requirements.py
For additional information, please follow installation guide
Source code¶
Also available on GitHub
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 | #!/usr/bin/env python3
import depthai as dai
def frametype2str(ft):
if ft == dai.EncodedFrame.FrameType.I:
return "I"
elif ft == dai.EncodedFrame.FrameType.P:
return "P"
elif ft == dai.EncodedFrame.FrameType.B:
return "B"
def compress(ls):
curr = ls[0]
count = 1
res = []
for i in range(1, len(ls)):
if ls[i] == curr:
count += 1
else:
res.append((count, curr))
curr = ls[i]
count = 1
res.append((count, curr))
return res
# Create pipeline
pipeline = dai.Pipeline()
# Define sources and output
camRgb = pipeline.create(dai.node.ColorCamera)
videoEnc = pipeline.create(dai.node.VideoEncoder)
xout = pipeline.create(dai.node.XLinkOut)
xout.setStreamName('h265')
# Properties
camRgb.setBoardSocket(dai.CameraBoardSocket.CAM_A)
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_4_K)
videoEnc.setDefaultProfilePreset(30, dai.VideoEncoderProperties.Profile.H265_MAIN)
# Linking
camRgb.video.link(videoEnc.input)
videoEnc.out.link(xout.input)
frametypes = []
# Connect to device and start pipeline
with dai.Device(pipeline) as device:
# Output queue will be used to get the encoded data from the output defined above
q = device.getOutputQueue(name="h265", maxSize=30, blocking=True)
# The .h265 file is a raw stream file (not playable yet)
with open('video.h265', 'wb') as videoFile:
print("Press Ctrl+C to stop encoding...")
try:
while True:
h265Packet = q.get() # Blocking call, will wait until a new data has arrived
frametypes.append(frametype2str(h265Packet.getFrameType()))
h265Packet.getData().tofile(videoFile) # Appends the packet data to the opened file
except KeyboardInterrupt:
# Keyboard interrupt (Ctrl + C) detected
pass
print("To view the encoded data, convert the stream file (.h265) into a video file (.mp4) using a command below:")
print("ffmpeg -framerate 30 -i video.h265 -c copy video.mp4")
print(",".join([f"{c}{f}" for c, f in compress(frametypes)]))
|