Feature Tracker with Motion Estimation

This example demonstrates the capabilities of the FeatureTracker combined with motion estimation. It detects and tracks features between consecutive frames using optical flow. Each feature is assigned a unique ID. The motion of the camera is estimated based on the tracked features, and the estimated motion (e.g., Up, Down, Left, Right, Rotating) is displayed on screen.

The Feature Detector example only detects features without estimating motion.

Demo

Feature Tracker with Motion Estimation Demo

Setup

Please run the install script to download all required dependencies. Please note that this script must be ran from git context, so you have to download the depthai-python repository first and then run the script

git clone https://github.com/luxonis/depthai-python.git
cd depthai-python/examples
python3 install_requirements.py

For additional information, please follow installation guide

Source code

Also available on GitHub

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import numpy as np
import cv2
from collections import deque
import depthai as dai


class CameraMotionEstimator:
    def __init__(self, filter_weight=0.5, motion_threshold=0.01, rotation_threshold=0.05):
        self.last_avg_flow = np.array([0.0, 0.0])
        self.filter_weight = filter_weight
        self.motion_threshold = motion_threshold
        self.rotation_threshold = rotation_threshold

    def estimate_motion(self, feature_paths):
        most_prominent_motion = "Camera Staying Still"
        max_magnitude = 0.0
        avg_flow = np.array([0.0, 0.0])
        total_rotation = 0.0
        vanishing_point = np.array([0.0, 0.0])
        num_features = len(feature_paths)

        print(f"Number of features: {num_features}")

        if num_features == 0:
            return most_prominent_motion, vanishing_point

        for path in feature_paths.values():
            if len(path) >= 2:
                src = np.array([path[-2].x, path[-2].y])
                dst = np.array([path[-1].x, path[-1].y])
                avg_flow += dst - src
                motion_vector = dst + (dst - src)
                vanishing_point += motion_vector
                rotation = np.arctan2(dst[1] - src[1], dst[0] - src[0])
                total_rotation += rotation

        avg_flow /= num_features
        avg_rotation = total_rotation / num_features
        vanishing_point /= num_features

        print(f"Average Flow: {avg_flow}")
        print(f"Average Rotation: {avg_rotation}")

        avg_flow = (self.filter_weight * self.last_avg_flow +
                    (1 - self.filter_weight) * avg_flow)
        self.last_avg_flow = avg_flow

        flow_magnitude = np.linalg.norm(avg_flow)
        rotation_magnitude = abs(avg_rotation)

        if flow_magnitude > max_magnitude and flow_magnitude > self.motion_threshold:
            if abs(avg_flow[0]) > abs(avg_flow[1]):
                most_prominent_motion = 'Right' if avg_flow[0] < 0 else 'Left'
            else:
                most_prominent_motion = 'Down' if avg_flow[1] < 0 else 'Up'
            max_magnitude = flow_magnitude

        if rotation_magnitude > max_magnitude and rotation_magnitude > self.rotation_threshold:
            most_prominent_motion = 'Rotating'

        return most_prominent_motion, vanishing_point


class FeatureTrackerDrawer:

    lineColor = (200, 0, 200)
    pointColor = (0, 0, 255)
    vanishingPointColor = (255, 0, 255)  # Violet color for vanishing point
    circleRadius = 2
    maxTrackedFeaturesPathLength = 30
    trackedFeaturesPathLength = 10

    trackedIDs = None
    trackedFeaturesPath = None

    def trackFeaturePath(self, features):

        newTrackedIDs = set()
        for currentFeature in features:
            currentID = currentFeature.id
            newTrackedIDs.add(currentID)

            if currentID not in self.trackedFeaturesPath:
                self.trackedFeaturesPath[currentID] = deque()

            path = self.trackedFeaturesPath[currentID]
            path.append(currentFeature.position)

            while(len(path) > max(1, FeatureTrackerDrawer.trackedFeaturesPathLength)):
                path.popleft()

            self.trackedFeaturesPath[currentID] = path

        featuresToRemove = set()
        for oldId in self.trackedIDs:
            if oldId not in newTrackedIDs:
                featuresToRemove.add(oldId)

        for id in featuresToRemove:
            self.trackedFeaturesPath.pop(id)

        self.trackedIDs = newTrackedIDs

    def drawVanishingPoint(self, img, vanishing_point):
        cv2.circle(img, (int(vanishing_point[0]), int(vanishing_point[1])), self.circleRadius, self.vanishingPointColor, -1, cv2.LINE_AA, 0)

    # Define color mapping for directions
    direction_colors = {
        "Up": (0, 255, 255),  # Yellow
        "Down": (0, 255, 0),  # Green
        "Left": (255, 0, 0),  # Blue
        "Right": (0, 0, 255), # Red
    }

    def drawFeatures(self, img, vanishing_point=None, prominent_motion=None):

        # Get the appropriate point color based on the prominent motion
        if prominent_motion in self.direction_colors:
            point_color = self.direction_colors[prominent_motion]
        else:
            point_color = self.pointColor

        for featurePath in self.trackedFeaturesPath.values():
            path = featurePath

            for j in range(len(path) - 1):
                src = (int(path[j].x), int(path[j].y))
                dst = (int(path[j + 1].x), int(path[j + 1].y))
                cv2.line(img, src, dst, point_color, 1, cv2.LINE_AA, 0)

            j = len(path) - 1
            cv2.circle(img, (int(path[j].x), int(path[j].y)), self.circleRadius, point_color, -1, cv2.LINE_AA, 0)

        # Draw the direction text on the image
        if prominent_motion:
            font = cv2.FONT_HERSHEY_SIMPLEX
            font_scale = 1
            font_thickness = 2
            text_size = cv2.getTextSize(prominent_motion, font, font_scale, font_thickness)[0]
            text_x = (img.shape[1] - text_size[0]) // 2
            text_y = text_size[1] + 20  # 20 pixels from the top

            # Get the appropriate color based on the prominent motion
            text_color = self.direction_colors.get(prominent_motion, (255, 255, 255))  # Default to white

            # Draw the text
            cv2.putText(img, prominent_motion, (text_x, text_y), font, font_scale, text_color, font_thickness, cv2.LINE_AA)


        # Draw vanishing point if provided
        if vanishing_point is not None:
            self.drawVanishingPoint(img, vanishing_point)

    def __init__(self, windowName):
        self.windowName = windowName
        cv2.namedWindow(windowName)
        self.trackedIDs = set()
        self.trackedFeaturesPath = dict()

def create_pipeline():
    pipeline = dai.Pipeline()

    # Create a MonoCamera node and set its properties
    mono_left = pipeline.create(dai.node.MonoCamera)
    mono_left.setCamera("left")
    mono_left.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
    mono_left.setFps(15)

    # Create a FeatureTracker node
    feature_tracker_left = pipeline.create(dai.node.FeatureTracker)

    # Create XLinkOut nodes for output streams
    xout_tracked_features_left = pipeline.create(dai.node.XLinkOut)
    xout_passthrough_left = pipeline.create(dai.node.XLinkOut)

    # Set stream names
    xout_tracked_features_left.setStreamName("trackedFeaturesLeft")
    xout_passthrough_left.setStreamName("passthroughLeft")

    # Allocate resources for improved performance
    num_shaves = 2
    num_memory_slices = 2
    feature_tracker_left.setHardwareResources(num_shaves, num_memory_slices)

    # Link the nodes
    mono_left.out.link(feature_tracker_left.inputImage)
    feature_tracker_left.passthroughInputImage.link(xout_passthrough_left.input)
    feature_tracker_left.outputFeatures.link(xout_tracked_features_left.input)

    return pipeline


if __name__ == '__main__':
    pipeline = create_pipeline()
    with dai.Device(pipeline) as device:
        output_features_left_queue = device.getOutputQueue(
            "trackedFeaturesLeft", maxSize=4, blocking=False)
        passthrough_image_left_queue = device.getOutputQueue(
            "passthroughLeft", maxSize=4, blocking=False)

        left_window_name = "Left"
        left_feature_drawer = FeatureTrackerDrawer(left_window_name)
        camera_estimator_left = CameraMotionEstimator(
            filter_weight=0.5, motion_threshold=0.3, rotation_threshold=0.5)

        while True:
            in_passthrough_frame_left = passthrough_image_left_queue.get()
            passthrough_frame_left = in_passthrough_frame_left.getFrame()
            left_frame = cv2.cvtColor(passthrough_frame_left, cv2.COLOR_GRAY2BGR)

            tracked_features_left = output_features_left_queue.get().trackedFeatures
            motions_left, vanishing_pt_left = camera_estimator_left.estimate_motion(
                left_feature_drawer.trackedFeaturesPath)

            left_feature_drawer.trackFeaturePath(tracked_features_left)
            left_feature_drawer.drawFeatures(left_frame, vanishing_pt_left, motions_left)

            print("Motions:", motions_left)
            cv2.imshow(left_window_name, left_frame)

            if cv2.waitKey(1) == ord('q'):
                break

Got questions?

Head over to Discussion Forum for technical support or any other questions you might have.