Feature Tracker with Motion Estimation¶
This example demonstrates the capabilities of the FeatureTracker combined with motion estimation. It detects and tracks features between consecutive frames using optical flow. Each feature is assigned a unique ID. The motion of the camera is estimated based on the tracked features, and the estimated motion (e.g., Up, Down, Left, Right, Rotating) is displayed on screen.
The Feature Detector example only detects features without estimating motion.
Demo¶
Setup¶
Please run the install script to download all required dependencies. Please note that this script must be ran from git context, so you have to download the depthai-python repository first and then run the script
git clone https://github.com/luxonis/depthai-python.git
cd depthai-python/examples
python3 install_requirements.py
For additional information, please follow installation guide
Source code¶
Also available on GitHub
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 | import numpy as np
import cv2
from collections import deque
import depthai as dai
class CameraMotionEstimator:
def __init__(self, filter_weight=0.5, motion_threshold=0.01, rotation_threshold=0.05):
self.last_avg_flow = np.array([0.0, 0.0])
self.filter_weight = filter_weight
self.motion_threshold = motion_threshold
self.rotation_threshold = rotation_threshold
def estimate_motion(self, feature_paths):
most_prominent_motion = "Camera Staying Still"
max_magnitude = 0.0
avg_flow = np.array([0.0, 0.0])
total_rotation = 0.0
vanishing_point = np.array([0.0, 0.0])
num_features = len(feature_paths)
print(f"Number of features: {num_features}")
if num_features == 0:
return most_prominent_motion, vanishing_point
for path in feature_paths.values():
if len(path) >= 2:
src = np.array([path[-2].x, path[-2].y])
dst = np.array([path[-1].x, path[-1].y])
avg_flow += dst - src
motion_vector = dst + (dst - src)
vanishing_point += motion_vector
rotation = np.arctan2(dst[1] - src[1], dst[0] - src[0])
total_rotation += rotation
avg_flow /= num_features
avg_rotation = total_rotation / num_features
vanishing_point /= num_features
print(f"Average Flow: {avg_flow}")
print(f"Average Rotation: {avg_rotation}")
avg_flow = (self.filter_weight * self.last_avg_flow +
(1 - self.filter_weight) * avg_flow)
self.last_avg_flow = avg_flow
flow_magnitude = np.linalg.norm(avg_flow)
rotation_magnitude = abs(avg_rotation)
if flow_magnitude > max_magnitude and flow_magnitude > self.motion_threshold:
if abs(avg_flow[0]) > abs(avg_flow[1]):
most_prominent_motion = 'Right' if avg_flow[0] < 0 else 'Left'
else:
most_prominent_motion = 'Down' if avg_flow[1] < 0 else 'Up'
max_magnitude = flow_magnitude
if rotation_magnitude > max_magnitude and rotation_magnitude > self.rotation_threshold:
most_prominent_motion = 'Rotating'
return most_prominent_motion, vanishing_point
class FeatureTrackerDrawer:
lineColor = (200, 0, 200)
pointColor = (0, 0, 255)
vanishingPointColor = (255, 0, 255) # Violet color for vanishing point
circleRadius = 2
maxTrackedFeaturesPathLength = 30
trackedFeaturesPathLength = 10
trackedIDs = None
trackedFeaturesPath = None
def trackFeaturePath(self, features):
newTrackedIDs = set()
for currentFeature in features:
currentID = currentFeature.id
newTrackedIDs.add(currentID)
if currentID not in self.trackedFeaturesPath:
self.trackedFeaturesPath[currentID] = deque()
path = self.trackedFeaturesPath[currentID]
path.append(currentFeature.position)
while(len(path) > max(1, FeatureTrackerDrawer.trackedFeaturesPathLength)):
path.popleft()
self.trackedFeaturesPath[currentID] = path
featuresToRemove = set()
for oldId in self.trackedIDs:
if oldId not in newTrackedIDs:
featuresToRemove.add(oldId)
for id in featuresToRemove:
self.trackedFeaturesPath.pop(id)
self.trackedIDs = newTrackedIDs
def drawVanishingPoint(self, img, vanishing_point):
cv2.circle(img, (int(vanishing_point[0]), int(vanishing_point[1])), self.circleRadius, self.vanishingPointColor, -1, cv2.LINE_AA, 0)
# Define color mapping for directions
direction_colors = {
"Up": (0, 255, 255), # Yellow
"Down": (0, 255, 0), # Green
"Left": (255, 0, 0), # Blue
"Right": (0, 0, 255), # Red
}
def drawFeatures(self, img, vanishing_point=None, prominent_motion=None):
# Get the appropriate point color based on the prominent motion
if prominent_motion in self.direction_colors:
point_color = self.direction_colors[prominent_motion]
else:
point_color = self.pointColor
for featurePath in self.trackedFeaturesPath.values():
path = featurePath
for j in range(len(path) - 1):
src = (int(path[j].x), int(path[j].y))
dst = (int(path[j + 1].x), int(path[j + 1].y))
cv2.line(img, src, dst, point_color, 1, cv2.LINE_AA, 0)
j = len(path) - 1
cv2.circle(img, (int(path[j].x), int(path[j].y)), self.circleRadius, point_color, -1, cv2.LINE_AA, 0)
# Draw the direction text on the image
if prominent_motion:
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1
font_thickness = 2
text_size = cv2.getTextSize(prominent_motion, font, font_scale, font_thickness)[0]
text_x = (img.shape[1] - text_size[0]) // 2
text_y = text_size[1] + 20 # 20 pixels from the top
# Get the appropriate color based on the prominent motion
text_color = self.direction_colors.get(prominent_motion, (255, 255, 255)) # Default to white
# Draw the text
cv2.putText(img, prominent_motion, (text_x, text_y), font, font_scale, text_color, font_thickness, cv2.LINE_AA)
# Draw vanishing point if provided
if vanishing_point is not None:
self.drawVanishingPoint(img, vanishing_point)
def __init__(self, windowName):
self.windowName = windowName
cv2.namedWindow(windowName)
self.trackedIDs = set()
self.trackedFeaturesPath = dict()
def create_pipeline():
pipeline = dai.Pipeline()
# Create a MonoCamera node and set its properties
mono_left = pipeline.create(dai.node.MonoCamera)
mono_left.setCamera("left")
mono_left.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
mono_left.setFps(15)
# Create a FeatureTracker node
feature_tracker_left = pipeline.create(dai.node.FeatureTracker)
# Create XLinkOut nodes for output streams
xout_tracked_features_left = pipeline.create(dai.node.XLinkOut)
xout_passthrough_left = pipeline.create(dai.node.XLinkOut)
# Set stream names
xout_tracked_features_left.setStreamName("trackedFeaturesLeft")
xout_passthrough_left.setStreamName("passthroughLeft")
# Allocate resources for improved performance
num_shaves = 2
num_memory_slices = 2
feature_tracker_left.setHardwareResources(num_shaves, num_memory_slices)
# Link the nodes
mono_left.out.link(feature_tracker_left.inputImage)
feature_tracker_left.passthroughInputImage.link(xout_passthrough_left.input)
feature_tracker_left.outputFeatures.link(xout_tracked_features_left.input)
return pipeline
if __name__ == '__main__':
pipeline = create_pipeline()
with dai.Device(pipeline) as device:
output_features_left_queue = device.getOutputQueue(
"trackedFeaturesLeft", maxSize=4, blocking=False)
passthrough_image_left_queue = device.getOutputQueue(
"passthroughLeft", maxSize=4, blocking=False)
left_window_name = "Left"
left_feature_drawer = FeatureTrackerDrawer(left_window_name)
camera_estimator_left = CameraMotionEstimator(
filter_weight=0.5, motion_threshold=0.3, rotation_threshold=0.5)
while True:
in_passthrough_frame_left = passthrough_image_left_queue.get()
passthrough_frame_left = in_passthrough_frame_left.getFrame()
left_frame = cv2.cvtColor(passthrough_frame_left, cv2.COLOR_GRAY2BGR)
tracked_features_left = output_features_left_queue.get().trackedFeatures
motions_left, vanishing_pt_left = camera_estimator_left.estimate_motion(
left_feature_drawer.trackedFeaturesPath)
left_feature_drawer.trackFeaturePath(tracked_features_left)
left_feature_drawer.drawFeatures(left_frame, vanishing_pt_left, motions_left)
print("Motions:", motions_left)
cv2.imshow(left_window_name, left_frame)
if cv2.waitKey(1) == ord('q'):
break
|