Demuxing Synchronized Script Outputs¶
This example demonstrates the use of the DepthAI Sync node in conjunction with the Demux node to synchronize and then demux outputs from two separate script nodes. Each script node generates data buffers at different intervals, which are first synchronized by the Sync node and then demultiplexed by the MessageDemux node.
Similar samples:
Demo¶
~/depthai-python/examples/Sync $ python3 demux_message_group.py
Start
Buffer 1 timestamp: 0:00:03.581073
Buffer 2 timestamp: 0:00:03.591084
----------
Buffer 1 timestamp: 0:00:04.583100
Buffer 2 timestamp: 0:00:04.497079
----------
Buffer 1 timestamp: 0:00:06.587174
Buffer 2 timestamp: 0:00:06.611154
----------
Buffer 1 timestamp: 0:00:07.589147
Buffer 2 timestamp: 0:00:07.517125
----------
Buffer 1 timestamp: 0:00:09.593076
Buffer 2 timestamp: 0:00:09.631089
----------
Buffer 1 timestamp: 0:00:10.595106
Buffer 2 timestamp: 0:00:10.537082
Setup¶
Please run the install script to download all required dependencies. Please note that this script must be ran from git context, so you have to download the depthai-python repository first and then run the script
git clone https://github.com/luxonis/depthai-python.git
cd depthai-python/examples
python3 install_requirements.py
For additional information, please follow installation guide
Source code¶
Also available on GitHub
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | import depthai as dai
import time
from datetime import timedelta
pipeline = dai.Pipeline()
script1 = pipeline.create(dai.node.Script)
script1.setScript("""
from time import sleep
while True:
sleep(1)
b = Buffer(512)
b.setData(bytes(4 * [i for i in range(0, 128)]))
b.setTimestamp(Clock.now())
node.io['out'].send(b)
""")
script2 = pipeline.create(dai.node.Script)
script2.setScript("""
from time import sleep
while True:
sleep(0.3)
b = Buffer(512)
b.setData(bytes(4 * [i for i in range(128, 256)]))
b.setTimestamp(Clock.now())
node.io['out'].send(b)
""")
sync = pipeline.create(dai.node.Sync)
sync.setSyncThreshold(timedelta(milliseconds=100))
demux = pipeline.create(dai.node.MessageDemux)
xout1 = pipeline.create(dai.node.XLinkOut)
xout1.setStreamName("xout1")
xout2 = pipeline.create(dai.node.XLinkOut)
xout2.setStreamName("xout2")
script1.outputs["out"].link(sync.inputs["s1"])
script2.outputs["out"].link(sync.inputs["s2"])
sync.out.link(demux.input)
demux.outputs["s1"].link(xout1.input)
demux.outputs["s2"].link(xout2.input)
with dai.Device(pipeline) as device:
print("Start")
q1 = device.getOutputQueue("xout1", maxSize=10, blocking=True)
q2 = device.getOutputQueue("xout2", maxSize=10, blocking=True)
while True:
bufS1 = q1.get()
bufS2 = q2.get()
print(f"Buffer 1 timestamp: {bufS1.getTimestamp()}")
print(f"Buffer 2 timestamp: {bufS2.getTimestamp()}")
print("----------")
time.sleep(0.2)
|
Also available on GitHub
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 | #include <chrono>
#include <iostream>
#include "depthai/depthai.hpp"
int main() {
dai::Pipeline pipeline;
auto script1 = pipeline.create<dai::node::Script>();
script1->setScript(
R"SCRPT(
from time import sleep
while True:
sleep(1)
b = Buffer(512)
b.setData(bytes(4 * [i for i in range(0, 128)]))
b.setTimestamp(Clock.now())
node.io['out'].send(b)
)SCRPT");
auto script2 = pipeline.create<dai::node::Script>();
script2->setScript(
R"SCRPT(
from time import sleep
while True:
sleep(0.3)
b = Buffer(512)
b.setData(bytes(4 * [i for i in range(128, 256)]))
b.setTimestamp(Clock.now())
node.io['out'].send(b)
)SCRPT");
auto sync = pipeline.create<dai::node::Sync>();
sync->setSyncThreshold(std::chrono::milliseconds(100));
auto demux = pipeline.create<dai::node::MessageDemux>();
auto xout1 = pipeline.create<dai::node::XLinkOut>();
xout1->setStreamName("xout1");
auto xout2 = pipeline.create<dai::node::XLinkOut>();
xout2->setStreamName("xout2");
script1->outputs["out"].link(sync->inputs["s1"]);
script2->outputs["out"].link(sync->inputs["s2"]);
sync->out.link(demux->input);
demux->outputs["s1"].link(xout1->input);
demux->outputs["s2"].link(xout2->input);
dai::Device device(pipeline);
std::cout << "Start" << std::endl;
auto queue1 = device.getOutputQueue("xout1", 10, true);
auto queue2 = device.getOutputQueue("xout2", 10, true);
while(true) {
auto bufS1 = queue1->get<dai::Buffer>();
auto bufS2 = queue2->get<dai::Buffer>();
std::cout << "Buffer 1 timestamp: " << bufS1->getTimestamp().time_since_epoch().count() << std::endl;
std::cout << "Buffer 2 timestamp: " << bufS2->getTimestamp().time_since_epoch().count() << std::endl;
std::cout << "----------" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
|
How it Works¶
Initialize a DepthAI pipeline.
Create two Script nodes, with each script generating and sending data buffers at different intervals.
Set up a Sync node with a synchronization threshold.
Integrate a MessageDemux node to separate the synchronized data streams.
Link the outputs of the Script nodes to the Sync node, and then from the Sync node to the MessageDemux node.
Start the pipeline and continuously receive demultiplexed data from the MessageDemux node.
Print the timestamps of the demultiplexed data for comparison.