Video Encoded Frame Type

This example showcases how to compress and print the sequence of encoded frame types (I, P, B) encountered during the encoding process.

Example script output

~/depthai-python/examples$ python3 video_encoding.py
Press Ctrl+C to stop encoding...
To view the encoded data, convert the stream file (.h265) into a video file (.mp4) using a command below:
ffmpeg -framerate 30 -i video.h265 -c copy video.mp4

1I,29P,1I,29P,1I,29P,1I,29P,1I,29P,1I,29P,1I,29P,1I,29P,1I,29P,1I,18P

This output indicates the script’s real-time video encoding process, and it compresses the sequence of frame types encountered. After stopping the script, it provides instructions to convert the raw H.265 stream into a playable MP4 video file. The final line shows a compressed representation of the frame types sequence, indicating the count of consecutive P, B, and I frames.

Setup

Please run the install script to download all required dependencies. Please note that this script must be ran from git context, so you have to download the depthai-python repository first and then run the script

git clone https://github.com/luxonis/depthai-python.git
cd depthai-python/examples
python3 install_requirements.py

For additional information, please follow installation guide

Source code

Also available on GitHub

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#!/usr/bin/env python3

import depthai as dai

def frametype2str(ft):
    if ft == dai.EncodedFrame.FrameType.I:
        return "I"
    elif ft == dai.EncodedFrame.FrameType.P:
        return "P"
    elif ft == dai.EncodedFrame.FrameType.B:
        return "B"

def compress(ls):
    curr = ls[0]
    count = 1
    res = []
    for i in range(1, len(ls)):
        if ls[i] == curr:
            count += 1
        else:
            res.append((count, curr))
            curr = ls[i]
            count = 1
    res.append((count, curr))
    return res


# Create pipeline
pipeline = dai.Pipeline()

# Define sources and output
camRgb = pipeline.create(dai.node.ColorCamera)
videoEnc = pipeline.create(dai.node.VideoEncoder)
xout = pipeline.create(dai.node.XLinkOut)

xout.setStreamName('h265')

# Properties
camRgb.setBoardSocket(dai.CameraBoardSocket.CAM_A)
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_4_K)
videoEnc.setDefaultProfilePreset(30, dai.VideoEncoderProperties.Profile.H265_MAIN)

# Linking
camRgb.video.link(videoEnc.input)
videoEnc.out.link(xout.input)

frametypes = []
# Connect to device and start pipeline
with dai.Device(pipeline) as device:

    # Output queue will be used to get the encoded data from the output defined above
    q = device.getOutputQueue(name="h265", maxSize=30, blocking=True)

    # The .h265 file is a raw stream file (not playable yet)
    with open('video.h265', 'wb') as videoFile:
        print("Press Ctrl+C to stop encoding...")
        try:
            while True:
                h265Packet = q.get()  # Blocking call, will wait until a new data has arrived
                frametypes.append(frametype2str(h265Packet.getFrameType()))
                h265Packet.getData().tofile(videoFile)  # Appends the packet data to the opened file
        except KeyboardInterrupt:
            # Keyboard interrupt (Ctrl + C) detected
            pass

    print("To view the encoded data, convert the stream file (.h265) into a video file (.mp4) using a command below:")
    print("ffmpeg -framerate 30 -i video.h265 -c copy video.mp4")

print(",".join([f"{c}{f}" for c, f in compress(frametypes)]))

Got questions?

Head over to Discussion Forum for technical support or any other questions you might have.