From 8d81e3bc918bc9bf0013ac16596a83309049db50 Mon Sep 17 00:00:00 2001 From: Caglar Pir Date: Tue, 17 Feb 2026 05:02:23 -0800 Subject: [PATCH] Preserve GPS timestamps from GPX tracks in video CAMM output --- mapillary_tools/exiftool_read_video.py | 36 ++- mapillary_tools/geotag/utils.py | 15 +- mapillary_tools/serializer/description.py | 35 ++- mapillary_tools/uploader.py | 31 ++- schema/image_description_schema.json | 7 + tests/unit/test_camm_parser.py | 309 ++++++++++++++++++++++ tests/unit/test_description.py | 123 +++++++++ tests/unit/test_exiftool_aggregate_gps.py | 154 +++++++++++ tests/unit/test_parse_gpx.py | 122 +++++++++ 9 files changed, 814 insertions(+), 18 deletions(-) create mode 100644 tests/unit/test_exiftool_aggregate_gps.py create mode 100644 tests/unit/test_parse_gpx.py diff --git a/mapillary_tools/exiftool_read_video.py b/mapillary_tools/exiftool_read_video.py index 9eef37b3..a5c2fcc0 100644 --- a/mapillary_tools/exiftool_read_video.py +++ b/mapillary_tools/exiftool_read_video.py @@ -202,24 +202,41 @@ def _aggregate_float_values_same_length( # aggregate speeds (optional) ground_speeds = _aggregate_float_values_same_length(ground_speed_tag) - # GPS timestamp (optional) - epoch_time = None + # GPS epoch times (optional) if gps_time_tag is not None: - gps_time_text = _extract_alternative_fields(texts_by_tag, [gps_time_tag], str) - if gps_time_text is not None: - dt = exif_read.parse_gps_datetime(gps_time_text) - if dt is not None: - epoch_time = geo.as_unix_time(dt) + gps_epoch_times: list[float | None] = [ + geo.as_unix_time(dt) if dt is not None else None + for dt in ( + exif_read.parse_gps_datetime(text) + for text in _extract_alternative_fields( + texts_by_tag, [gps_time_tag], list + ) + or [] + ) + ] + if len(gps_epoch_times) != expected_length: + LOG.warning( + "Found different number of GPS epoch times %d and coordinates %d", + len(gps_epoch_times), + expected_length, + ) + gps_epoch_times = [None] * expected_length + elif time_tag is not None: + # Use per-point GPS timestamps as epoch times + gps_epoch_times = [t for t in timestamps] + else: + gps_epoch_times = [None] * expected_length # build track track: list[GPSPoint] = [] - for timestamp, lon, lat, alt, direction, ground_speed in zip( + for timestamp, lon, lat, alt, direction, ground_speed, epoch_time in zip( timestamps, lons, lats, alts, directions, ground_speeds, + gps_epoch_times, ): if timestamp is None or lon is None or lat is None: continue @@ -329,6 +346,7 @@ def _aggregate_gps_track_by_sample_time( lon_tag=lon_tag, lat_tag=lat_tag, alt_tag=alt_tag, + gps_time_tag=gps_time_tag, direction_tag=direction_tag, ground_speed_tag=ground_speed_tag, ) @@ -484,6 +502,7 @@ def _extract_gps_track_from_track(self) -> list[GPSPoint]: lon_tag=f"{track_ns}:GPSLongitude", lat_tag=f"{track_ns}:GPSLatitude", alt_tag=f"{track_ns}:GPSAltitude", + gps_time_tag=f"{track_ns}:GPSDateTime", direction_tag=f"{track_ns}:GPSTrack", ground_speed_tag=f"{track_ns}:GPSSpeed", gps_fix_tag=f"{track_ns}:GPSMeasureMode", @@ -521,5 +540,6 @@ def _extract_gps_track_from_quicktime( lon_tag=f"{namespace}:GPSLongitude", lat_tag=f"{namespace}:GPSLatitude", alt_tag=f"{namespace}:GPSAltitude", + gps_time_tag=f"{namespace}:GPSDateTime", direction_tag=f"{namespace}:GPSTrack", ) diff --git a/mapillary_tools/geotag/utils.py b/mapillary_tools/geotag/utils.py index 7be6debf..e1959347 100644 --- a/mapillary_tools/geotag/utils.py +++ b/mapillary_tools/geotag/utils.py @@ -12,7 +12,7 @@ import gpxpy -from .. import exiftool_read, geo, utils +from .. import exiftool_read, geo, telemetry, utils Track = T.List[geo.Point] LOG = logging.getLogger(__name__) @@ -29,13 +29,22 @@ def parse_gpx(gpx_file: Path) -> list[Track]: tracks.append([]) for point in segment.points: if point.time is not None: + unix_time = geo.as_unix_time(point.time) tracks[-1].append( - geo.Point( - time=geo.as_unix_time(point.time), + telemetry.CAMMGPSPoint( + time=unix_time, lat=point.latitude, lon=point.longitude, alt=point.elevation, angle=None, + time_gps_epoch=unix_time, + gps_fix_type=3 if point.elevation is not None else 2, + horizontal_accuracy=0.0, + vertical_accuracy=0.0, + velocity_east=0.0, + velocity_north=0.0, + velocity_up=0.0, + speed_accuracy=0.0, ) ) diff --git a/mapillary_tools/serializer/description.py b/mapillary_tools/serializer/description.py index 9f50611b..ebd3095d 100644 --- a/mapillary_tools/serializer/description.py +++ b/mapillary_tools/serializer/description.py @@ -25,7 +25,7 @@ import jsonschema -from .. import exceptions, geo +from .. import exceptions, geo, telemetry from ..types import ( BaseSerializer, describe_error_metadata, @@ -196,6 +196,10 @@ class ErrorDescription(TypedDict, total=False): "type": ["number", "null"], "description": "Camera angle of the track point, in degrees. If null, the angle will be interpolated", }, + { + "type": ["number", "null"], + "description": "GPS epoch time of the track point, in seconds. If present, used as the authoritative timestamp", + }, ], }, }, @@ -509,18 +513,43 @@ def _from_video_desc(cls, desc: VideoDescription) -> VideoMetadata: class PointEncoder: @classmethod def encode(cls, p: geo.Point) -> T.Sequence[float | int | None]: - entry = [ + entry: list[float | int | None] = [ int(p.time * 1000), round(p.lon, _COORDINATES_PRECISION), round(p.lat, _COORDINATES_PRECISION), round(p.alt, _ALTITUDE_PRECISION) if p.alt is not None else None, round(p.angle, _ANGLE_PRECISION) if p.angle is not None else None, + p.get_gps_epoch_time(), ] return entry @classmethod def decode(cls, entry: T.Sequence[T.Any]) -> geo.Point: - time_ms, lon, lat, alt, angle = entry + if len(entry) >= 6 and entry[5] is not None: + time_ms, lon, lat, alt, angle, time_gps_epoch = ( + entry[0], + entry[1], + entry[2], + entry[3], + entry[4], + entry[5], + ) + return telemetry.CAMMGPSPoint( + time=time_ms / 1000, + lat=lat, + lon=lon, + alt=alt, + angle=angle, + time_gps_epoch=time_gps_epoch, + gps_fix_type=3 if alt is not None else 2, + horizontal_accuracy=0.0, + vertical_accuracy=0.0, + velocity_east=0.0, + velocity_north=0.0, + velocity_up=0.0, + speed_accuracy=0.0, + ) + time_ms, lon, lat, alt, angle = entry[0], entry[1], entry[2], entry[3], entry[4] return geo.Point(time=time_ms / 1000, lon=lon, lat=lat, alt=alt, angle=angle) diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index b4866707..f514229b 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -302,10 +302,33 @@ def prepare_camm_info( camm_info.gps.append(point) elif isinstance(point, telemetry.GPSPoint): - # There is no proper CAMM entry for GoPro GPS - if camm_info.mini_gps is None: - camm_info.mini_gps = [] - camm_info.mini_gps.append(point) + # Convert GPSPoint to CAMMGPSPoint if it has a valid epoch_time, + # so the GPS timestamp is preserved in the CAMM type 6 entry + if point.epoch_time is not None and point.epoch_time > 0: + camm_point = telemetry.CAMMGPSPoint( + time=point.time, + lat=point.lat, + lon=point.lon, + alt=point.alt, + angle=point.angle, + time_gps_epoch=point.epoch_time, + gps_fix_type=point.fix.value + if point.fix is not None + else (3 if point.alt is not None else 2), + horizontal_accuracy=0.0, + vertical_accuracy=0.0, + velocity_east=0.0, + velocity_north=0.0, + velocity_up=0.0, + speed_accuracy=0.0, + ) + if camm_info.gps is None: + camm_info.gps = [] + camm_info.gps.append(camm_point) + else: + if camm_info.mini_gps is None: + camm_info.mini_gps = [] + camm_info.mini_gps.append(point) elif isinstance(point, geo.Point): if camm_info.mini_gps is None: diff --git a/schema/image_description_schema.json b/schema/image_description_schema.json index 2172036f..ce294b4a 100644 --- a/schema/image_description_schema.json +++ b/schema/image_description_schema.json @@ -34,6 +34,13 @@ "null" ], "description": "Camera angle of the track point, in degrees. If null, the angle will be interpolated" + }, + { + "type": [ + "number", + "null" + ], + "description": "GPS epoch time of the track point, in seconds. If present, used as the authoritative timestamp" } ] } diff --git a/tests/unit/test_camm_parser.py b/tests/unit/test_camm_parser.py index 34d2ae9e..e4bd12d6 100644 --- a/tests/unit/test_camm_parser.py +++ b/tests/unit/test_camm_parser.py @@ -410,3 +410,312 @@ def test_camm_trak_carries_mvhd_timestamps(): mdhd = camm_track.extract_mdhd_boxdata() assert mdhd["creation_time"] == src_creation_time assert mdhd["modification_time"] == src_modification_time + + +def test_build_and_parse_gpx_sourced_camm_gps_points(): + """Test CAMMGPSPoint objects as created from GPX data (zeroed accuracy/velocity).""" + points = [ + telemetry.CAMMGPSPoint( + time=0.0, + lat=37.7749, + lon=-122.4194, + alt=10.0, + angle=None, + time_gps_epoch=1706000000.0, + gps_fix_type=3, + horizontal_accuracy=0.0, + vertical_accuracy=0.0, + velocity_east=0.0, + velocity_north=0.0, + velocity_up=0.0, + speed_accuracy=0.0, + ), + telemetry.CAMMGPSPoint( + time=1.0, + lat=37.7750, + lon=-122.4195, + alt=11.0, + angle=None, + time_gps_epoch=1706000001.0, + gps_fix_type=3, + horizontal_accuracy=0.0, + vertical_accuracy=0.0, + velocity_east=0.0, + velocity_north=0.0, + velocity_up=0.0, + speed_accuracy=0.0, + ), + telemetry.CAMMGPSPoint( + time=2.0, + lat=37.7751, + lon=-122.4196, + alt=12.0, + angle=None, + time_gps_epoch=1706000002.0, + gps_fix_type=3, + horizontal_accuracy=0.0, + vertical_accuracy=0.0, + velocity_east=0.0, + velocity_north=0.0, + velocity_up=0.0, + speed_accuracy=0.0, + ), + ] + metadata = types.VideoMetadata( + Path(""), + filetype=types.FileType.CAMM, + points=points, + ) + x = encode_decode_empty_camm_mp4(metadata) + # Verify points round-trip with time_gps_epoch preserved + assert len(x.points) == 3 + for original, decoded in zip(points, x.points): + assert isinstance(decoded, telemetry.CAMMGPSPoint) + decoded_camm = T.cast(telemetry.CAMMGPSPoint, decoded) + assert abs(original.time_gps_epoch - decoded_camm.time_gps_epoch) < 10e-6 + assert abs(original.time - decoded_camm.time) < 10e-6 + assert abs(original.lat - decoded_camm.lat) < 10e-6 + assert abs(original.lon - decoded_camm.lon) < 10e-6 + + +def test_prepare_camm_info_gpspoint_with_epoch_time(): + """GPSPoint with valid epoch_time should be converted to CAMMGPSPoint and routed to camm_info.gps.""" + points = [ + telemetry.GPSPoint( + time=0.0, + lat=37.7749, + lon=-122.4194, + alt=10.0, + angle=None, + epoch_time=1706000000.0, + fix=telemetry.GPSFix.FIX_3D, + precision=1.5, + ground_speed=5.0, + ), + telemetry.GPSPoint( + time=1.0, + lat=37.7750, + lon=-122.4195, + alt=11.0, + angle=None, + epoch_time=1706000001.0, + fix=telemetry.GPSFix.FIX_2D, + precision=2.0, + ground_speed=6.0, + ), + ] + metadata = types.VideoMetadata( + Path(""), + filetype=types.FileType.GOPRO, + points=points, + ) + camm_info = uploader.VideoUploader.prepare_camm_info(metadata) + + # Should be routed to gps (type 6), not mini_gps (type 5) + assert camm_info.gps is not None + assert len(camm_info.gps) == 2 + assert camm_info.mini_gps is None + + # Verify conversion preserved fields + for original, converted in zip(points, camm_info.gps): + assert isinstance(converted, telemetry.CAMMGPSPoint) + assert converted.lat == original.lat + assert converted.lon == original.lon + assert converted.alt == original.alt + assert converted.time == original.time + assert converted.time_gps_epoch == original.epoch_time + + # Verify fix type was correctly converted from GPSFix enum + assert camm_info.gps[0].gps_fix_type == 3 # FIX_3D.value + assert camm_info.gps[1].gps_fix_type == 2 # FIX_2D.value + + +def test_prepare_camm_info_gpspoint_without_epoch_time(): + """GPSPoint without epoch_time should remain in mini_gps (type 5).""" + points = [ + telemetry.GPSPoint( + time=0.0, + lat=37.7749, + lon=-122.4194, + alt=10.0, + angle=None, + epoch_time=None, + fix=telemetry.GPSFix.FIX_3D, + precision=1.5, + ground_speed=5.0, + ), + telemetry.GPSPoint( + time=1.0, + lat=37.7750, + lon=-122.4195, + alt=11.0, + angle=None, + epoch_time=0, + fix=telemetry.GPSFix.FIX_2D, + precision=2.0, + ground_speed=6.0, + ), + ] + metadata = types.VideoMetadata( + Path(""), + filetype=types.FileType.GOPRO, + points=points, + ) + camm_info = uploader.VideoUploader.prepare_camm_info(metadata) + + # Should stay in mini_gps (type 5) + assert camm_info.gps is None + assert camm_info.mini_gps is not None + assert len(camm_info.mini_gps) == 2 + + +def test_prepare_camm_info_gpspoint_no_fix(): + """GPSPoint with epoch_time but fix=None should infer fix type from altitude.""" + point_with_alt = telemetry.GPSPoint( + time=0.0, + lat=37.7749, + lon=-122.4194, + alt=10.0, + angle=None, + epoch_time=1706000000.0, + fix=None, + precision=None, + ground_speed=None, + ) + point_without_alt = telemetry.GPSPoint( + time=1.0, + lat=37.7750, + lon=-122.4195, + alt=None, + angle=None, + epoch_time=1706000001.0, + fix=None, + precision=None, + ground_speed=None, + ) + metadata = types.VideoMetadata( + Path(""), + filetype=types.FileType.GOPRO, + points=[point_with_alt, point_without_alt], + ) + camm_info = uploader.VideoUploader.prepare_camm_info(metadata) + + assert camm_info.gps is not None + assert len(camm_info.gps) == 2 + # With altitude -> 3D fix + assert camm_info.gps[0].gps_fix_type == 3 + # Without altitude -> 2D fix + assert camm_info.gps[1].gps_fix_type == 2 + + +def test_prepare_camm_info_mixed_point_types(): + """Test that mixed point types are correctly routed.""" + points: T.List[geo.Point] = [ + # CAMMGPSPoint -> gps + telemetry.CAMMGPSPoint( + time=0.0, + lat=37.7749, + lon=-122.4194, + alt=10.0, + angle=None, + time_gps_epoch=1706000000.0, + gps_fix_type=3, + horizontal_accuracy=1.0, + vertical_accuracy=2.0, + velocity_east=0.1, + velocity_north=0.2, + velocity_up=0.3, + speed_accuracy=0.5, + ), + # GPSPoint with epoch_time -> converted to CAMMGPSPoint -> gps + telemetry.GPSPoint( + time=1.0, + lat=37.7750, + lon=-122.4195, + alt=11.0, + angle=None, + epoch_time=1706000001.0, + fix=telemetry.GPSFix.FIX_3D, + precision=1.5, + ground_speed=5.0, + ), + # GPSPoint without epoch_time -> mini_gps + telemetry.GPSPoint( + time=2.0, + lat=37.7751, + lon=-122.4196, + alt=12.0, + angle=None, + epoch_time=None, + fix=telemetry.GPSFix.FIX_3D, + precision=1.5, + ground_speed=5.0, + ), + # geo.Point -> mini_gps + geo.Point( + time=3.0, + lat=37.7752, + lon=-122.4197, + alt=13.0, + angle=None, + ), + ] + metadata = types.VideoMetadata( + Path(""), + filetype=types.FileType.CAMM, + points=points, + ) + camm_info = uploader.VideoUploader.prepare_camm_info(metadata) + + # 2 points in gps (CAMMGPSPoint + converted GPSPoint) + assert camm_info.gps is not None + assert len(camm_info.gps) == 2 + assert camm_info.gps[0].time_gps_epoch == 1706000000.0 + assert camm_info.gps[1].time_gps_epoch == 1706000001.0 + + # 2 points in mini_gps (GPSPoint without epoch + geo.Point) + assert camm_info.mini_gps is not None + assert len(camm_info.mini_gps) == 2 + + +def test_prepare_camm_info_gpspoint_roundtrip(): + """GPSPoint with epoch_time should round-trip through CAMM encode/decode with timestamp preserved.""" + points = [ + telemetry.GPSPoint( + time=0.0, + lat=37.7749, + lon=-122.4194, + alt=10.0, + angle=None, + epoch_time=1706000000.0, + fix=telemetry.GPSFix.FIX_3D, + precision=1.5, + ground_speed=5.0, + ), + telemetry.GPSPoint( + time=1.0, + lat=37.7750, + lon=-122.4195, + alt=11.0, + angle=None, + epoch_time=1706000001.0, + fix=telemetry.GPSFix.FIX_3D, + precision=2.0, + ground_speed=6.0, + ), + ] + metadata = types.VideoMetadata( + Path(""), + filetype=types.FileType.GOPRO, + points=points, + ) + x = encode_decode_empty_camm_mp4(metadata) + + # Should come back as CAMMGPSPoint with epoch time preserved + assert len(x.points) == 2 + for original, decoded in zip(points, x.points): + assert isinstance(decoded, telemetry.CAMMGPSPoint) + decoded_camm = T.cast(telemetry.CAMMGPSPoint, decoded) + assert abs(original.epoch_time - decoded_camm.time_gps_epoch) < 10e-6 + assert abs(original.lat - decoded_camm.lat) < 10e-6 + assert abs(original.lon - decoded_camm.lon) < 10e-6 diff --git a/tests/unit/test_description.py b/tests/unit/test_description.py index d955d6ef..4dd5c5c4 100644 --- a/tests/unit/test_description.py +++ b/tests/unit/test_description.py @@ -6,10 +6,12 @@ import json from pathlib import Path +from mapillary_tools import geo, telemetry from mapillary_tools.exceptions import MapillaryMetadataValidationError from mapillary_tools.serializer.description import ( DescriptionJSONSerializer, ImageVideoDescriptionFileSchema, + PointEncoder, validate_image_desc, ) @@ -119,3 +121,124 @@ def test_serialize_image_description_ok(): **actual_descs[0], "filename": Path(actual_descs[0]["filename"]).name, } + + +def test_encode_base_point(): + p = geo.Point( + time=1.5, lat=37.7749295, lon=-122.4194155, alt=10.1235, angle=90.1234 + ) + encoded = PointEncoder.encode(p) + assert len(encoded) == 6 + assert encoded[0] == 1500 # time in ms + assert encoded[1] == round(-122.4194155, 7) # lon + assert encoded[2] == round(37.7749295, 7) # lat + assert encoded[3] == round(10.1235, 3) # alt + assert encoded[4] == round(90.1234, 3) # angle + assert encoded[5] is None # no GPS epoch time + + +def test_decode_base_point(): + entry = [1500, -122.4194155, 37.7749295, 10.124, 90.123] + p = PointEncoder.decode(entry) + assert type(p) is geo.Point + assert p.time == 1.5 + assert p.lon == -122.4194155 + assert p.lat == 37.7749295 + assert p.alt == 10.124 + assert p.angle == 90.123 + + +def test_encode_camm_gps_point(): + p = telemetry.CAMMGPSPoint( + time=2.0, + lat=37.7749, + lon=-122.4194, + alt=15.0, + angle=180.0, + time_gps_epoch=1700000001.0, + gps_fix_type=3, + horizontal_accuracy=1.5, + vertical_accuracy=2.0, + velocity_east=0.5, + velocity_north=0.3, + velocity_up=0.1, + speed_accuracy=0.2, + ) + encoded = PointEncoder.encode(p) + assert len(encoded) == 6 + assert encoded[0] == 2000 + assert encoded[5] == 1700000001.0 + + +def test_decode_camm_gps_point(): + entry = [2000, -122.4194, 37.7749, 15.0, 180.0, 1700000001.0] + p = PointEncoder.decode(entry) + assert isinstance(p, telemetry.CAMMGPSPoint) + assert p.time == 2.0 + assert p.lat == 37.7749 + assert p.lon == -122.4194 + assert p.alt == 15.0 + assert p.angle == 180.0 + assert p.time_gps_epoch == 1700000001.0 + assert p.gps_fix_type == 3 # alt is not None + + +def test_encode_decode_roundtrip_base_point(): + original = geo.Point(time=3.456, lat=40.7128, lon=-74.006, alt=5.5, angle=270.0) + encoded = PointEncoder.encode(original) + decoded = PointEncoder.decode(encoded) + assert type(decoded) is geo.Point + # time is rounded to ms precision + assert decoded.time == int(original.time * 1000) / 1000 + assert decoded.lat == round(original.lat, 7) + assert decoded.lon == round(original.lon, 7) + assert decoded.alt == round(original.alt, 3) + assert decoded.angle == round(original.angle, 3) + + +def test_encode_decode_roundtrip_camm_gps_point(): + original = telemetry.CAMMGPSPoint( + time=4.0, + lat=51.5074, + lon=-0.1278, + alt=20.0, + angle=45.0, + time_gps_epoch=1700000500.0, + gps_fix_type=3, + horizontal_accuracy=1.0, + vertical_accuracy=1.0, + velocity_east=0.0, + velocity_north=0.0, + velocity_up=0.0, + speed_accuracy=0.0, + ) + encoded = PointEncoder.encode(original) + decoded = PointEncoder.decode(encoded) + assert isinstance(decoded, telemetry.CAMMGPSPoint) + assert decoded.time_gps_epoch == original.time_gps_epoch + + +def test_decode_6_element_with_none_gps_epoch(): + entry = [1500, -122.4194, 37.7749, 10.0, 90.0, None] + p = PointEncoder.decode(entry) + # Should fall back to plain Point when 6th element is None + assert type(p) is geo.Point + assert p.time == 1.5 + + +def test_encode_gps_point_without_epoch(): + p = telemetry.GPSPoint( + time=5.0, + lat=35.6762, + lon=139.6503, + alt=40.0, + angle=0.0, + epoch_time=None, + fix=None, + precision=None, + ground_speed=None, + ) + encoded = PointEncoder.encode(p) + # get_gps_epoch_time() returns None, so 6th element is None + assert len(encoded) == 6 + assert encoded[5] is None diff --git a/tests/unit/test_exiftool_aggregate_gps.py b/tests/unit/test_exiftool_aggregate_gps.py new file mode 100644 index 00000000..923b783d --- /dev/null +++ b/tests/unit/test_exiftool_aggregate_gps.py @@ -0,0 +1,154 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the BSD license found in the +# LICENSE file in the root directory of this source tree. + +from __future__ import annotations + +import xml.etree.ElementTree as ET + +from mapillary_tools.exiftool_read_video import ( + _aggregate_gps_track, + _aggregate_gps_track_by_sample_time, + expand_tag, +) + + +def test_aggregate_gps_track_with_gps_time_tag(): + texts_by_tag = { + expand_tag("QuickTime:GPSLongitude"): ["29.0", "29.1"], + expand_tag("QuickTime:GPSLatitude"): ["36.7", "36.8"], + expand_tag("QuickTime:GPSDateTime"): [ + "2025-01-01 00:00:00Z", + "2025-01-01 00:00:01Z", + ], + expand_tag("QuickTime:GPSTimeStamp"): [ + "2025-01-01 00:00:10Z", + "2025-01-01 00:00:11Z", + ], + } + track = _aggregate_gps_track( + texts_by_tag, + time_tag="QuickTime:GPSDateTime", + lon_tag="QuickTime:GPSLongitude", + lat_tag="QuickTime:GPSLatitude", + gps_time_tag="QuickTime:GPSTimeStamp", + ) + assert len(track) == 2 + for point in track: + assert point.epoch_time is not None + + +def test_aggregate_gps_track_gps_time_fallback_to_time_tag(): + texts_by_tag = { + expand_tag("QuickTime:GPSLongitude"): ["29.0", "29.1"], + expand_tag("QuickTime:GPSLatitude"): ["36.7", "36.8"], + expand_tag("QuickTime:GPSDateTime"): [ + "2025-01-01 00:00:00Z", + "2025-01-01 00:00:01Z", + ], + } + track = _aggregate_gps_track( + texts_by_tag, + time_tag="QuickTime:GPSDateTime", + lon_tag="QuickTime:GPSLongitude", + lat_tag="QuickTime:GPSLatitude", + gps_time_tag=None, + ) + assert len(track) == 2 + # Without gps_time_tag, epoch_time should be populated from time_tag timestamps + for point in track: + assert point.epoch_time is not None + + +def test_aggregate_gps_track_no_time_tags(): + texts_by_tag = { + expand_tag("QuickTime:GPSLongitude"): ["29.0", "29.1"], + expand_tag("QuickTime:GPSLatitude"): ["36.7", "36.8"], + } + track = _aggregate_gps_track( + texts_by_tag, + time_tag=None, + lon_tag="QuickTime:GPSLongitude", + lat_tag="QuickTime:GPSLatitude", + gps_time_tag=None, + ) + assert len(track) == 2 + for point in track: + assert point.epoch_time is None + + +def test_aggregate_gps_track_gps_time_length_mismatch(): + texts_by_tag = { + expand_tag("QuickTime:GPSLongitude"): ["29.0", "29.1", "29.2"], + expand_tag("QuickTime:GPSLatitude"): ["36.7", "36.8", "36.9"], + expand_tag("QuickTime:GPSDateTime"): [ + "2025-01-01 00:00:00Z", + "2025-01-01 00:00:01Z", + "2025-01-01 00:00:02Z", + ], + # Mismatched length: only 2 entries for 3 coordinates + expand_tag("QuickTime:GPSTimeStamp"): [ + "2025-01-01 00:00:10Z", + "2025-01-01 00:00:11Z", + ], + } + track = _aggregate_gps_track( + texts_by_tag, + time_tag="QuickTime:GPSDateTime", + lon_tag="QuickTime:GPSLongitude", + lat_tag="QuickTime:GPSLatitude", + gps_time_tag="QuickTime:GPSTimeStamp", + ) + assert len(track) == 3 + # Length mismatch triggers fallback: all epoch_time should be None + for point in track: + assert point.epoch_time is None + + +def _make_element(tag: str, text: str) -> ET.Element: + el = ET.Element(expand_tag(tag)) + el.text = text + return el + + +def test_aggregate_gps_track_by_sample_time_with_gps_time(): + track_ns = "Track1" + elements = [ + _make_element(f"{track_ns}:GPSLongitude", "29.0"), + _make_element(f"{track_ns}:GPSLatitude", "36.7"), + _make_element(f"{track_ns}:GPSDateTime", "2025-01-01 00:00:00Z"), + _make_element(f"{track_ns}:GPSLongitude", "29.1"), + _make_element(f"{track_ns}:GPSLatitude", "36.8"), + _make_element(f"{track_ns}:GPSDateTime", "2025-01-01 00:00:01Z"), + ] + sample_iterator = [(0.0, 2.0, elements)] + track = _aggregate_gps_track_by_sample_time( + sample_iterator, + lon_tag=f"{track_ns}:GPSLongitude", + lat_tag=f"{track_ns}:GPSLatitude", + gps_time_tag=f"{track_ns}:GPSDateTime", + ) + assert len(track) == 2 + for point in track: + assert point.epoch_time is not None + + +def test_aggregate_gps_track_by_sample_time_no_gps_time(): + track_ns = "Track1" + elements = [ + _make_element(f"{track_ns}:GPSLongitude", "29.0"), + _make_element(f"{track_ns}:GPSLatitude", "36.7"), + _make_element(f"{track_ns}:GPSLongitude", "29.1"), + _make_element(f"{track_ns}:GPSLatitude", "36.8"), + ] + sample_iterator = [(0.0, 2.0, elements)] + track = _aggregate_gps_track_by_sample_time( + sample_iterator, + lon_tag=f"{track_ns}:GPSLongitude", + lat_tag=f"{track_ns}:GPSLatitude", + gps_time_tag=None, + ) + assert len(track) == 2 + for point in track: + assert point.epoch_time is None diff --git a/tests/unit/test_parse_gpx.py b/tests/unit/test_parse_gpx.py new file mode 100644 index 00000000..17ca9770 --- /dev/null +++ b/tests/unit/test_parse_gpx.py @@ -0,0 +1,122 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the BSD license found in the +# LICENSE file in the root directory of this source tree. + +from __future__ import annotations + +import tempfile +from pathlib import Path + +from mapillary_tools import geo, telemetry +from mapillary_tools.geotag.utils import parse_gpx + +FIXTURE_DIR = Path(__file__).resolve().parent.parent / "data" +GPX_FILE = FIXTURE_DIR / "gpx" / "sf_30km_h.gpx" + + +def test_parse_gpx_creates_camm_gps_points(): + tracks = parse_gpx(GPX_FILE) + assert len(tracks) == 1 + track = tracks[0] + assert len(track) > 0 + + for point in track: + assert isinstance(point, telemetry.CAMMGPSPoint) + assert point.time_gps_epoch == point.time + assert point.gps_fix_type == 3 # all points have + assert point.horizontal_accuracy == 0.0 + assert point.vertical_accuracy == 0.0 + assert point.velocity_east == 0.0 + assert point.velocity_north == 0.0 + assert point.velocity_up == 0.0 + assert point.speed_accuracy == 0.0 + assert point.angle is None + assert point.alt is not None + + +def test_parse_gpx_without_elevation(): + gpx_content = """\ + + + + + + + + + + + + + +""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".gpx", delete=False) as f: + f.write(gpx_content) + f.flush() + tracks = parse_gpx(Path(f.name)) + + assert len(tracks) == 1 + for point in tracks[0]: + assert isinstance(point, telemetry.CAMMGPSPoint) + assert point.gps_fix_type == 2 + assert point.alt is None + + +def test_parse_gpx_skips_points_without_time(): + gpx_content = """\ + + + + + + + + + + + + + + + +""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".gpx", delete=False) as f: + f.write(gpx_content) + f.flush() + tracks = parse_gpx(Path(f.name)) + + assert len(tracks) == 1 + # The point without