Source code for gpxity.track

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright (c) 2019 Wolfgang Rohdewald <wolfgang@rohdewald.de>
# See LICENSE for details.

"""This module defines :class:`~gpxity.track.Track`."""

# pylint: disable=protected-access

from math import asin, sqrt, degrees
import datetime
from contextlib import contextmanager
from functools import total_ordering
import weakref
from copy import deepcopy
import logging

# This code would speed up parsing GPX by about 30%. When doing
# that, GPX will only return str instead of datetime for times.
#
# import gpxpy.gpxfield as mod_gpxfield
# mod_gpxfield.TIME_TYPE=None

# pylint: disable=too-many-lines

from gpxpy import gpx as mod_gpx
from gpxpy import parse as gpxpy_parse
from gpxpy.geo import length as gpx_length, Location
from gpxpy.geo import simplify_polyline

from .util import repr_timespan, uniq, positions_equal

GPX = mod_gpx.GPX
GPXTrack = mod_gpx.GPXTrack
GPXTrackSegment = mod_gpx.GPXTrackSegment
GPXXMLSyntaxException = mod_gpx.GPXXMLSyntaxException

# see https://github.com/tkrajina/gpxpy/issues/150
Location.__hash__ = lambda x: int(x.latitude * 1000) + int(x.longitude * 1000) + int(x.elevation * 10)  # noqa


__all__ = ['Track', 'Fences']


[docs]class Fences: # pylint: disable=too-few-public-methods """ Defines circles. Args: config_str: The string from auth.cfg Attributes: center (GPXTrackPoint): The center radius (meter): The radius in meters """ def __init__(self, config_str: str): """init.""" self.circles = list() if config_str is not None: for fence in config_str.split(' '): parts = fence.split('/') if len(parts) != 3: raise Exception('fence needs 3 parts') try: parts = [x.strip() for x in parts] center = Location(float(parts[0]), float(parts[1])) radius = float(parts[2]) except Exception: raise Exception('Fence definition is wrong: {}'.format(fence)) circle = (center, radius) self.circles.append(circle)
[docs] def outside(self, point) ->bool: """Determine if point is outside of all fences. Returns: True or False. """ return all(point.distance_2d(x[0]) > x[1] for x in self.circles)
[docs]@total_ordering class Track: # pylint: disable=too-many-public-methods """Represents a track. A Track is essentially a GPX file. A GPX file may contain multiple tracks but whenever this documentation says track or Track it does not refer to one of possibly multiple entities in the GPX file. It refers to this class Track. If a backend supports attributes not directly supported by the GPX format like the MapMyTracks track type, they will transparently be encodeded in existing GPX fields like keywords, see :attr:`keywords`. The GPX part is done by https://github.com/tkrajina/gpxpy. If a track is assigned to a backend, all changes will by default be written directly to the backend. Some backends are able to change only one attribute with little time overhead, others always have to rewrite the entire track. However you can use the context manager :meth:`batch_changes`. This holds back updating the backend until leaving the context. If you manipulate the gpx directly, this goes unnoticed to the updating mechanism. Use :meth:`rewrite` when done. Not all backends support everything, you could get the exception NotImplementedError. All points are always rounded to 6 decimal digits when they are added to the track. However some backends may support less than 6 decimals. You can query Backend.point_precision. The data will only be loaded from the backend when it is needed. Some backends might support loading some attributes separately but we do not make use of that. However backends have two ways of loading data: Either load a list of Tracks or load all information about a specific track. Often loading the list of tracks gives us some attributes for free, so listing those tracks may be much faster if you do not want everything listed. Args: gpx (GPX): Initial content. To be used if you create a new Track from scratch without loading it from some backend. Attributes: categories (tuple(str)): The legal values for :attr:`~Track.category`. The first one is used as default value. This is a superset of the values for the different backends. Every backend maps from its internal values into those when reading and maps them back when writing. Since not all backends support all values defined here and since some backends may define more values than we know, information may get lost when converting. """ # pylint: disable = too-many-instance-attributes,too-many-public-methods
[docs] class CannotMerge(Exception): """Is raised if Track.merge() fails."""
categories = ( 'Cycling', 'Cycling - Road', 'Cycling - Gravel', 'Cycling - MTB', 'Cycling - Indoor', 'Cycling - Hand', 'Cycling - Touring', 'Cycling - Foot', 'Running', 'Running - Trail', 'Running - Urban Trail', 'Sailing', 'Walking', 'Hiking', 'Hiking - Speed', 'Swimming', 'Driving', 'Off road driving', 'Motor racing', 'Motorcycling', 'Enduro', 'Skiing', 'Skiing - Touring', 'Skiing - Backcountry', 'Skiing - Nordic', 'Skiing - Alpine', 'Skiing - Roller', 'Canoeing', 'Kayaking', 'Sea kayaking', 'Stand up paddle boarding', 'Rowing', 'Windsurfing', 'Kiteboarding', 'Orienteering', 'Mountaineering', 'Skating', 'Skateboarding', 'Horse riding', 'Hang gliding', 'Gliding', 'Flying', 'Snowboarding', 'Paragliding', 'Hot air ballooning', 'Nordic walking', 'Snowshoeing', 'Jet skiing', 'Powerboating', 'Swimrun', 'Pedelec', 'Crossskating', 'Motorhome', 'Cabriolet', 'Coach', 'Pack animal trekking', 'Train', 'Wheelchair', 'Sightseeing', 'Geocaching', 'Longboard', 'River navigation', 'Skating - Inline', 'Miscellaneous') def __init__(self, gpx=None): """See class docstring.""" self.__dirty = list() self._batch_changes = False self.__category = self.categories[0] self.__public = False self._ids = list() self.__id_in_backend = None self.__backend = None self._loaded = False self._header_data = dict() # only for internal use because we clear data on _full_load() self.__gpx = gpx or GPX() self.__backend = None self.__cached_distance = None self._similarity_others = weakref.WeakValueDictionary() self._similarities = dict() self.__without_fences = None # used by context manager "fenced()" if gpx: self._decode_keywords(self.__gpx.keywords) self._round_points(self.points()) self._loaded = True @property def backend(self): """The backend this track lives in. If the track was constructed in memory, backend is None. This is a read-only property. It is set with :meth:`Backend.add <gpxity.backend.Backend.add>`. It is not possible to decouple a track from its backend, use :meth:`clone()`. Returns: The backend """ return self.__backend # :attr:`Track.id_in_backend <gpxity.track.Track.id_in_backend>`. @property def id_in_backend(self) ->str: """Every backend has its own scheme for unique track ids. Some backends may change the id if the track data changes. Returns: the id in the backend """ return self.__id_in_backend @id_in_backend.setter def id_in_backend(self, value: str) ->None: """Change the id in the backend. Currently supported only by Directory. Args: value: The new value """ if value is not None: if not isinstance(value, str): raise Exception('{}: id_in_backend must be str'.format(value)) if '/' in value: raise Exception('{}: / not allowed in id_in_backend'.format(value)) if self.__id_in_backend == value: return if self.__id_in_backend: self._ids.insert(0, (str(self.backend), self.__id_in_backend)) if self.__is_decoupled: # internal use self.__id_in_backend = value else: if not self.__id_in_backend: raise Exception('Cannot set id_in_backend for yet unsaved track {}'.format(self)) if not value: raise Exception('Cannot remove id_in_backend for saved track {}'.format(self)) with self._decouple(): self.backend._change_id(self, value) def _set_backend(self, value): """To be used only by backend implementations.""" assert self.__is_decoupled old_backend = self.__backend self.__backend = value if self.__gpx.keywords: if old_backend is None or old_backend.__class__ != value.__class__: # encode keywords for the new backend # TODO: unittest self.change_keywords(self.__gpx.keywords)
[docs] def rewrite(self) ->None: """Call this after you directly manipulated :attr:`gpx`.""" if not self._loaded: raise Exception('Track.rewrite: The track must already be loaded fully') self._dirty = 'gpx'
@property def _dirty(self) ->list: """ Check if the track is in sync with the backend. Setting :attr:`_dirty` will directly write the changed data into the backend. :attr:`_dirty` can receive an arbitrary string like 'title'. If the backend has a method _write_title, that one will be called. Otherwise the entire track will be written by the backend. Returns: list: The names of the attributes currently marked as dirty. """ return self.__dirty @_dirty.setter def _dirty(self, value): """See dirty.getter.""" if not isinstance(value, str): raise Exception('_dirty only receives str') if value in self._header_data: del self._header_data[value] if value == 'gpx': self._uncache_gpx() if not self.__is_decoupled: if value == 'gpx': if self.__without_fences is not None: raise Exception( '{}: You may not modify gpx while being in the context manager "fenced()"'.format(self)) self.__dirty.append(value) if not self._batch_changes: self._rewrite() def _uncache_gpx(self): """gpx has changed: clear caches.""" self.__cached_distance = None for other in self._similarity_others.values(): del other._similarity_others[id(self)] # TODO: unittest where other does not exist anymore del other._similarities[id(self)] self._similarity_others = weakref.WeakValueDictionary() self._similarities = dict() def _clear_dirty(self): """To be used by the backend when saving.""" self.__dirty = list()
[docs] def clone(self): """Create a new track with the same content but without backend. Returns: ~gpxity.track.Track: the new track """ self.__resolve_header_data() result = Track(gpx=self.gpx.clone()) result.category = self.category result.public = self.public result._ids = deepcopy(self._ids) if self.backend is not None: result._ids.insert(0, (str(self.backend), self.__id_in_backend)) return result
def _rewrite(self): """Rewrite all changes in the associated backend. If any of those conditions is met, do nothing: - we are currently loading from backend: Avoid recursion - batch_changes is active - we have no backend Otherwise the backend will save this track. """ if self.backend is None: self._clear_dirty() if not self.__dirty: return if 'write' not in self.backend.supported: # TODO: unittest raise Exception('Rewriting {}: "write" is not supported'.format(self)) if not self.__is_decoupled and not self._batch_changes: with self._decouple(): self.backend._rewrite(self, self.__dirty) self._clear_dirty()
[docs] def remove(self): """Remove this track in the associated backend. If the track is not coupled with a backend, raise an Exception. """ if self.backend is None: raise Exception('{}: Removing needs a backend'.format(self)) self.backend.remove(self.id_in_backend)
@property def time(self) ->datetime.datetime: """datetime.datetime: start time of track. For a simpler implementation of backends, notably MMT, we ignore gpx.time. Instead we return the time of the earliest track point. Only if there is no track point, return gpx.time. If that is unknown too, return None. For the same reason time is readonly. We assume that the first point comes first in time and the last point comes last in time. In other words, points should be ordered by their time. """ if 'time' in self._header_data: return self._header_data['time'] self._load_full() try: return next(self.points()).time except StopIteration: pass @property def title(self) -> str: """str: The title. Returns: the title """ if 'title' in self._header_data: return self._header_data['title'] self._load_full() return self.__gpx.name or '' @title.setter def title(self, value: str): """see getter.""" if value != self.__gpx.name: self._load_full() self.__gpx.name = value self._dirty = 'title' @property def description(self) ->str: """str: The description. Returns: The description """ if 'description' in self._header_data: return self._header_data['description'] self._load_full() return self.__gpx.description or '' @description.setter def description(self, value: str): """see getter.""" if value != self.__gpx.description: self._load_full() self.__gpx.description = value self._dirty = 'description'
[docs] @contextmanager def fenced(self, fences): """Suppress points in fences.""" if self.__without_fences is not None: raise Exception('fenced() is already active') self.__without_fences = self.__gpx try: if fences: for track in self.__gpx.tracks: for segment in track.segments: segment.points = [x for x in segment.points if fences.outside(x)] self.__gpx.waypoints = [x for x in self.gpx.waypoints if fences.outside(x)] self._uncache_gpx() yield finally: self.__gpx = self.__without_fences self._uncache_gpx() self.__without_fences = None
@contextmanager def _decouple(self): """Context manager: disable automic synchronization with the backend. In that state, automatic writes of changes into the backend are disabled, and if you access attributes which would normally trigger a full load from the backend, they will not. (The latter is used by __str__ and __repr__). """ from_backend = self.__backend prev_value = from_backend._decoupled if from_backend is not None else None if from_backend is not None: from_backend._decoupled = True try: yield finally: if from_backend is not None: from_backend._decoupled = prev_value @property def __is_decoupled(self): """True if we are currently decoupled from the backend. In that state, changes to Track are not written to the backend and the track is not marked dirty. Returns: True if we are decoupled """ if self.backend is None: return True return self.backend._decoupled
[docs] @contextmanager def batch_changes(self): """Context manager: disable the direct update in the backend and saves the entire track when done. This may or may not make things faster. Directory and GPSIES profits from this, MMT maybe. """ prev_batch_changes = self._batch_changes self._batch_changes = True try: yield finally: self._batch_changes = prev_batch_changes self._rewrite()
@property def category(self) ->str: """str: What is this track doing? If we have no current value, return the default. The value is automatically translated between our internal value and the value used by the backend. This happens when reading from or writing to the backend. Returns: The current value or the default value (see :attr:`categories`) """ if not self._loaded and 'category' in self._header_data: return self._header_data['category'] self._load_full() return self.__category or self.__default_category() def __default_category(self) ->str: """The default for either an unsaved track or for the corresponding backend. Returns: The category """ if self.backend is None: return self.categories[0] return self.backend.decode_category(self.backend.supported_categories[0]) @category.setter def category(self, value: str): """see getter.""" if value is None: value = self.__default_category() if value != self.__category: if value not in self.categories: raise Exception('Category {} is not known'.format(value)) self._load_full() self.__category = value self._dirty = 'category' def _load_full(self) ->None: """Load the full track from source_backend if not yet loaded.""" if (self.backend is not None and self.id_in_backend and not self._loaded and not self.__is_decoupled and 'scan' in self.backend.supported): # noqa self.backend._read_all_decoupled(self) self._loaded = True if not self.__is_decoupled: self.__resolve_header_data() def __resolve_header_data(self): """Put header data into gpx and clear them.""" with self._decouple(): pairs = [(x[0], x[1]) for x in self._header_data.items()] for key, value in pairs: if key in ('title', 'description', 'category', 'public', 'keywords'): setattr(self, key, value) elif key in ('time', 'distance'): pass elif key == 'ids': self._ids = value else: raise Exception('Unhandled header_data: {}/{}'.format(key, value)) self._header_data.clear()
[docs] def add_points(self, points) ->None: """Add points to last segment in the last track. If no track is allocated yet and points is not an empty list, allocates a track. Args: points (list(GPXTrackPoint): The points to be added """ if points: self._add_points(points) self._dirty = 'gpx'
def _add_points(self, points): """Just add without setting dirty.""" if points: if self.__gpx.tracks: # make sure the same points are not added twice _ = self.__gpx.tracks[-1].segments[-1] assert points != _.points[-len(points):] self._load_full() if not self.__gpx.tracks: self.__gpx.tracks.append(GPXTrack()) self.__gpx.tracks[0].segments.append(GPXTrackSegment()) self._round_points(points) self.__gpx.tracks[-1].segments[-1].points.extend(points) def _decode_keywords(self, data: str, into_header_data: bool = False): # noqa """Extract our special keywords Category: and Status:. # TODO: why is this not backend specific? Some backends have # their own fields for this Args: data: The raw keywords coming from the backend into_header_data: if False, set the real track fields. If True, save everything in self._header_data. """ # pylint: disable=too-many-branches gpx_keywords = list() ids = list() if isinstance(data, str): data = [x.strip() for x in data.split(', ')] if data is not None: for keyword in data: _ = [x.strip() for x in keyword.split(':')] what = _[0] value = ':'.join(_[1:]) if into_header_data: if what == 'Category': self._header_data['category'] = value elif what == 'Status': self._header_data['public'] = value == 'public' elif what == 'Id': _ = value.split('/') backend_name = '/'.join(_[:-1]) if backend_name == '': backend_name = '.' ids.append((backend_name, _[-1])) else: gpx_keywords.append(keyword) else: if what == 'Category': self.category = value elif what == 'Status': self.public = value == 'public' elif what == 'Id': _ = value.split('/') backend_name = '/'.join(_[:-1]) if backend_name == '': backend_name = '.' ids.append((backend_name, _[-1])) else: gpx_keywords.append(keyword) gpx_keywords = [x[1:] if x.startswith('-') else x for x in gpx_keywords] if into_header_data: self._header_data['keywords'] = sorted(gpx_keywords) self._header_data['ids'] = ids else: if 'keywords' in self._header_data: del self._header_data['keywords'] self.__gpx.keywords = ', '.join(sorted(gpx_keywords)) if 'ids' in self._header_data: del self._header_data['ids'] self._ids = ids def _encode_keywords(self) ->str: """Add our special keywords Category and Status. Returns: The full list of keywords as one str """ result = self.keywords result.append('Category:{}'.format(self.category)) result.append('Status:{}'.format('public' if self.public else 'private')) for _ in self.ids: # TODO: encode the , to something else result.append('Id:{}/{}'.format(*_)) return ', '.join(result)
[docs] def parse(self, indata): """Parse GPX. :attr:`title`, :attr:`description` and :attr:`category` from indata have precedence over the current values. :attr:`public` will be or-ed :attr:`keywords` will stay unchanged if indata has none, otherwise be replaced from indata Args: indata: may be a file descriptor or str """ assert self.__is_decoupled if hasattr(indata, 'read'): indata = indata.read() if not indata: # ignore empty file return with self._decouple(): old_title = self.title old_description = self.description old_public = self.public try: self.__gpx = gpxpy_parse(indata) except GPXXMLSyntaxException as exc: self.backend.logger.error( '%s: Track %s has illegal GPX XML: %s', self.backend, self.id_in_backend, exc) raise if self.__gpx.keywords: self._decode_keywords(self.__gpx.keywords) self.public = self.__public or old_public if old_title and not self.__gpx.name: self.__gpx.name = old_title if old_description and not self.__gpx.description: self.__gpx.description = old_description self._round_points(self.points()) self._header_data = dict() self._loaded = True self.__workaround_for_gpxpy_issue_140()
def __workaround_for_gpxpy_issue_140(self): """Remove empty track extension elements. Maybe we have to do that for other extension elements too. But I hope gpxity can soon enough depend on a fixed stable version of gpxpy. """ for track in self.gpx.tracks: track.extensions = [x for x in track.extensions if len(x) or x.text is not None] @staticmethod def _round_points(points): """Round points to 6 decimal digits because some backends may cut last digits. Gpsies truncates to 7 digits. The points are rounded in place! Args: points (list(GPXTrackPoint): The points to be rounded """ for _ in points: _.longitude = round(_.longitude, 6) _.latitude = round(_.latitude, 6)
[docs] def to_xml(self) ->str: """Produce exactly one line per trackpoint for easier editing (like removal of unwanted points). Returns: The xml string.""" self._load_full() old_keywords = self.__gpx.keywords try: self.__gpx.keywords = self._encode_keywords() result = self.__gpx.to_xml() result = result.replace('</trkpt><', '</trkpt>\n<') result = result.replace('<copyright ></copyright>', '') # gpxviewer does not accept such illegal xml result = result.replace('<link ></link>', '') result = result.replace('<author>\n</author>\n', '') result = result.replace('\n</trkpt>', '</trkpt>') result = result.replace('>\n<ele>', '><ele>') result = result.replace('>\n<time>', '><time>') result = result.replace('</ele>\n<time>', '</ele><time>') result = result.replace('.0</ele>', '</ele>') # this could differ depending on the source # for newer gpxpy 1.3.3 which indents the xml: result = result.replace('\n </trkpt>', '</trkpt>') result = result.replace('>\n <ele>', '><ele>') result = result.replace('>\n <time>', '><time>') result = result.replace('\n\n', '\n') if not result.endswith('\n'): result += '\n' finally: self.__gpx.keywords = old_keywords return result
@property def public(self): """ bool: Is this a private track (can only be seen by the account holder) or is it public?. Default value is False Returns: True if track is public, False if it is private """ if 'public' in self._header_data: return self._header_data['public'] self._load_full() return self.__public @public.setter def public(self, value): """Store this flag as keyword 'public'.""" if value != self.__public: self._load_full() self.__public = value self._dirty = 'public' @property def gpx(self) ->GPX: """ Direct access to the GPX object. If you use it to change its content, remember to call :meth:`rewrite` afterwards. Returns: the GPX object """ self._load_full() return self.__gpx @property def last_time(self) ->datetime.datetime: """The last time we received. Returns: The last time we received so far. If none, return None.""" _ = self.last_point() return _.time if _ else None @property def keywords(self): """list(str): represent them as a sorted list - in GPX they are comma separated. Content is whatever you want. Because the GPX format does not have attributes for everything used by all backends, we encode some of the backend arguments in keywords. Example for mapmytracks: keywords = 'Status:public, Category:Cycling'. However this is transparent for you. When parsing theGPX file, those are removed from keywords, and the are re-added in when exporting in :meth:`to_xml`. So :attr:`Track.keywords` will never show those special values. Some backends may change keywords. :class:`~gpxity.backends.mmt.MMT` converts the first character into upper case and will return it like that. Gpxity will not try to hide such problems. So if you save a track in :class:`~gpxity.backends.mmt.MMT`, its keywords will change. But they will not change if you copy from :class:`~gpxity.backends.mmt.MMT` to :class:`~gpxity.backends.directory.Directory` - so if you copy from DirectoryA to :class:`~gpxity.backends.mmt.MMT` to DirectoryB, the keywords in DirectoryA and DirectoryB will not be identical, for example "berlin" in DirectoryA but "Berlin" in DirectoryB. """ if 'keywords' in self._header_data: # TODO: unittest checking that keywords is always a deep copy return self._header_data['keywords'][:] return self.__gpx_keywords() def __gpx_keywords(self): """Return the keywords from GPX. Returns: A list with keywords.setter """ self._load_full() if self.__gpx.keywords: return list(sorted(x.strip() for x in self.__gpx.keywords.split(','))) return list() @keywords.setter def keywords(self, values): """Replace all keywords. Args: Either single str with one or more keywords, separated by commas or an iterable of keywords. The new keywords. Must not have duplicates. """ self.change_keywords(values, replace=True) @staticmethod def _check_keyword(keyword): """Must not be one of our internally used codes.""" internal = (('Category', 'category'), ('Status', 'public'), ('Id', 'ids')) for internal_kw, attr in internal: if keyword.startswith(internal_kw + ':'): raise Exception('Do not use {} directly, use Track.{}'.format(internal_kw, attr)) if ',' in keyword: raise Exception('No comma allowed within a keyword') def __prepare_keywords(self, values): """Common introductory code for change_keywords. The values may be preceded with a '-' which will be preserved in the result._ids Args: values: Either single str with one or more keywords, separated by commas or an iterable of keywords. Returns: A set of legal keywords as expected by the backend. """ if isinstance(values, str): lst = [x.strip() for x in values.split(',')] else: lst = list(values) pairs = list() for _ in lst: if _.startswith('-'): pairs.append((False, _[1:])) else: pairs.append((True, _)) for _ in pairs: self._check_keyword(_[1]) if self.backend is not None: pairs = [(x[0], self.backend._encode_keyword(x[1])) for x in pairs] adding = {x[1] for x in pairs if x[0]} removing = {x[1] for x in pairs if not x[0]} return adding, removing
[docs] def change_keywords(self, values, replace=False, dry_run=False): """Change keywords. Duplicate keywords are silently ignored. A keyword may not contain a comma. Keywords with a preceding '-' are removed, the others are added. Raise an Exception if a keyword is both added and removed. Args: values: Either a single str with one or more keywords, separated by commas or an iterable of keywords replace: if True, replace current keywords with the new ones. Ignores keywords preceded with a '-'. dry_run: if True, only return the new keywords but do not make that change Returns: The new keywords """ self._load_full() have = {*self.keywords} add, remove = self.__prepare_keywords(values) if add & remove: raise Exception('Cannot both add and remove keywords {}'.format(add & remove)) if replace: remove = have - add else: add -= have remove &= have new = (have | add) - remove new_kw_list = ', '.join(sorted(new)) if not dry_run and self.__gpx.keywords != new_kw_list: self.__gpx.keywords = new_kw_list with self.batch_changes(): if remove: self._dirty = 'remove_keywords:{}'.format(', '.join(remove)) if add: self._dirty = 'add_keywords:{}'.format(', '.join(add)) assert sorted(new) == self.keywords, ( 'change_keywords failed. Expected: {}, got: {}'.format(sorted(new), self.keywords)) return sorted(new)
[docs] def speed(self) ->float: """Speed over the entire time in km/h or 0.0. Returns: The speed """ time_range = (self.time, self.last_time) if time_range[0] is None or time_range[1] is None: return 0.0 duration = time_range[1] - time_range[0] seconds = duration.days * 24 * 3600 + duration.seconds if seconds: return self.distance() / seconds * 3600 return 0.0
[docs] def moving_speed(self) ->float: """Speed for time in motion in km/h. Returns: The moving speed """ bounds = self.gpx.get_moving_data() if bounds.moving_time: return bounds.moving_distance / bounds.moving_time * 3.6 return 0.0
[docs] def warnings(self): """Return a list of strings with easy to find problems.""" result = list() if self.speed() > self.moving_speed(): result.append('Speed {:.3f} must not be above Moving speed {:.3f}'.format( self.speed(), self.moving_speed())) if self.category == 'Cycling': if not 3 <= self.speed() <= 60: result.append('Speed {:.3f} is out of expected range 3..60'.format(self.speed())) if not 10 <= self.moving_speed() <= 50: result.append('Moving speed {:.3f} is out of expected range 10..50'.format(self.moving_speed())) if self.category == 'Cycling - MTB': if not 3 <= self.speed() <= 50: result.append('Speed {:.3f} is out of expected range 3..50'.format(self.speed())) if not 10 <= self.moving_speed() <= 40: result.append('Moving speed {:.3f} is out of expected range 10..40'.format(self.moving_speed())) return result
def __repr__(self) ->str: """The repr. Returns: the repr str """ with self._decouple(): # this should not automatically load the entire track parts = [] parts.append('public' if self.public else 'private') if self.__gpx: parts.append(self.category) if self.keywords: parts.append(','.join(self.keywords)) if self.title: parts.append(self.title) if self.time and self.last_time: parts.append(repr_timespan(self.time, self.last_time)) elif self.time: parts.append(str(self.time)) if 'distance' in self._header_data: parts.append('{:4.2f}km'.format(self._header_data['distance'])) elif self.gpx.get_track_points_no() or self._loaded: parts.append('{} points'.format(self.gpx.get_track_points_no())) return '{}({})'.format(str(self), ' '.join(parts)) def __str__(self) ->str: """The str. Returns: a unique full identifier """ if self.backend is None: return 'unsaved: "{}" time={} id={}'.format(self.title or 'untitled', self.time, id(self)) ident = self.id_in_backend or 'unsaved' if str(self.backend) == '.': # current directory return ident return '{}/{}'.format(self.backend, ident)
[docs] def key(self, with_category: bool = True, with_last_time: bool = True, precision=None) ->str: """For speed optimized equality checks, not granted to be exact, but sufficiently safe IMHO. Args: with_category: If False, do not use self.category. Needed for comparing tracks for equality like in unittests because values can change and information can get lost while copying between different backends with_last_time: If False, do not use self.last_time. precision: For latitude/longitude. After comma digits. Default is as defined by backend or 6. Returns: a string with selected attributes in printable form. """ self._load_full() return 'title:{} description:{} keywords:{} category:{}: public:{} last_time:{} angle:{} points:{}'.format( self.title, self.description, ','.join(self.keywords), self.category if with_category else '', self.public, self.last_time if with_last_time else '', self.angle(precision), self.gpx.get_track_points_no())
def __eq__(self, other) ->bool: """equal. Returns: result """ if self is other: return True return self.key() == other.key() def __lt__(self, other) ->bool: """less than. Returns: result """ return self.key() < other.key()
[docs] def distance(self) ->float: """For me, the earth is flat. Returns: the distance in km, rounded to m. 0.0 if not computable. # TODO: needs unittest """ if 'distance' in self._header_data: return self._header_data['distance'] if self.__cached_distance is None: self.__cached_distance = round(gpx_length(list(self.points())) / 1000, 3) return self.__cached_distance or 0.0
[docs] def angle(self, precision=None) ->float: """For me, the earth is flat. Args: precision: After comma digits. Default is as defined by backend or 6. Returns: the angle in degrees 0..360 between start and end. If we have no track, return 0 """ try: first_point = next(self.points()) except StopIteration: return 0 last_point = self.last_point() if precision is None: if self.backend is None: precision = 6 else: precision = self.backend.point_precision delta_lat = round(first_point.latitude, precision) - round(last_point.latitude, precision) delta_long = round(first_point.longitude, precision) - round(last_point.longitude, precision) norm_lat = delta_lat / 90.0 norm_long = delta_long / 180.0 try: result = degrees(asin(norm_long / sqrt(norm_lat**2 + norm_long ** 2))) except ZeroDivisionError: return 0 if norm_lat >= 0.0: return (360.0 + result) % 360.0 return 180.0 - result
[docs] def segments(self): """ A generator over all segments. Yields: GPXTrackSegment: all segments in all tracks """ self._load_full() for track in self.__gpx.tracks: for segment in track.segments: yield segment
[docs] def points(self): """ A generator over all points. Yields: GPXTrackPoint: all points in all tracks and segments """ for segment in self.segments(): for point in segment.points: yield point
[docs] def point_list(self): """A flat list with all points. Returns: The list """ return sum((x.points for x in self.segments()), [])
[docs] def last_point(self): """Return the last point of the track. None if none.""" # TODO: unittest for track without __gpx or without points try: return self.gpx.tracks[-1].segments[-1].points[-1] except IndexError: return None
[docs] def adjust_time(self, delta): """Add a timedelta to all times. gpxpy.gpx.adjust_time does the same but it ignores waypoints. A newer gpxpy.py has a new bool arg for adjust_time which also adjusts waypoints on request but I do not want to check versions. """ self.gpx.adjust_time(delta) for wpt in self.gpx.waypoints: wpt.time += delta self._dirty = 'gpx'
[docs] def points_hash(self) -> float: """A hash that is hopefully different for every possible track. It is built using the combination of all points. Returns: The hash """ self._load_full() result = 1.0 for point in self.points(): if point.longitude: result *= point.longitude if point.latitude: result *= point.latitude if point.elevation: result *= point.elevation _ = point.time if _: result *= (_.hour + 1) result *= (_.minute + 1) result *= (_.second + 1) result %= 1e20 return result
[docs] def points_equal(self, other, digits=4) ->bool: """ Compare points for same position. Args: digits: Number of after comma digits to compare Returns: True if both tracks have identical points. All points of all tracks and segments are combined. Elevations are ignored. """ # We do not use points_hash because we want to abort as soon as we know # they are different. if self.gpx.get_track_points_no() != other.gpx.get_track_points_no(): return False for _, (point1, point2) in enumerate(zip(self.points(), other.points())): if not positions_equal(point1, point2, digits): return False return True
[docs] def index(self, other, digits=4): """Check if this track contains other track.gpx. This only works if all values for latitude and longitude are nearly identical. Useful if one of the tracks had geofencing applied. Args: digits: How many after point digits are used Returns: None or the starting index for other.points in self.points """ self_points = self.point_list() other_points = other.point_list() for self_idx in range(len(self_points) - len(other_points) + 1): for other_idx, other_point in enumerate(other_points): if not positions_equal(self_points[self_idx + other_idx], other_point, digits): break else: return self_idx return None
[docs] @staticmethod def overlapping_times(tracks): """Find tracks with overlapping times. Yields: groups of tracks with overlapping times. Sorted by time. This may be very slow for many long tracks. """ previous = None group = list() # Track is mutable, so a set is no possible for current in sorted(tracks, key=lambda x: x.time): if previous and current.time <= previous.last_time: if previous not in group: group.append(previous) group.append(current) else: if group: yield sorted(group, key=lambda x: x.time) group = list() previous = current if group: yield sorted(group, key=lambda x: x.time)
def _has_default_title(self) ->bool: """Try to check if track has the default title given by a backend. Returns: True if so""" # the title of MMT might have been copied into another backend: if not self.title: return True if self.title == '{} track'.format(self.category): return True if all(x in '0123456789 :-_' for x in self.title): return True return False def __merge_metadata(self, other, dry_run): """Merge metadata from other. Used only by self.merge. Returns: a list of verbosity messages """ msg = list() if not other._has_default_title() and self._has_default_title(): msg.append('Title: {} -> {}'.format(self.title, other.title)) if not dry_run: self.title = other.title if other.description and other.description != self.description: msg.append('Additional description: {}'.format( other.description)) if not dry_run: self.description += '\n' self.description += other.description if other.public and not self.public: msg.append('Visibility: private -> public') if not dry_run: self.public = True if other.category != self.category: msg.append('Category: {}={} wins over {}={}'.format( other, other.category, self, self.category)) kw_src = set(other.keywords) kw_dst = set(self.keywords) if kw_src - kw_dst: msg.append('New keywords: {}'.format(','.join(kw_src - kw_dst))) if not dry_run: self.keywords = kw_src | kw_dst return msg
[docs] def can_merge(self, other, partial_tracks: bool = False): """Check if self and other are mergeable. args: other: The other Track partial_tracks: If True, they are mergeable if one of them contains the other one. Returns: (int, str) int is either None or the starting index of the shorter track in the longer track str is either None or a string explaing why this is not mergeable """ if str(self) == str(other): return None, 'Cannot merge identical tracks {}'.format(self) if (other.gpx.get_track_points_no()) == 0 and other.gpx.waypoints: # mergable return 0, None if partial_tracks: other_in_self = self.index(other) if other_in_self is not None: return other_in_self, None self_in_other = other.index(self) if self_in_other is not None: return self_in_other, None elif self.points_equal(other): return 0, None return None, ( 'Cannot merge {} with {} points into {} with {} points'.format( other, other.gpx.get_track_points_no(), self, self.gpx.get_track_points_no()))
def __merge_waypoints(self, other, dry_run) ->list: """Merge waypoints from other with differing positions. Returns: list(str) Messages about what has been done. """ # TODO: unittest def have(wpt): """True if we already have this waypoint.""" return (wpt.latitude, wpt.longitude) in [(x.latitude, x.longitude) for x in new_wpts] merged_count = 0 new_wpts = self.gpx.waypoints[:] for other_wpt in other.gpx.waypoints: if not have(other_wpt): new_wpts.append(other_wpt) if not dry_run: self.gpx.waypoints.append(other_wpt) self.rewrite() merged_count += 1 if merged_count > 0: return ['{} got {} waypoints from {}'.format(self, merged_count, other)] return [] def __merge_tracks(self, other, dry_run, shorter_at) ->list: """Merge tracks from other. Returns: list(str) Messages about what has been done. """ msg = [] if other.gpx.get_track_points_no() > self.gpx.get_track_points_no(): if not dry_run: self.gpx.tracks = deepcopy(other.gpx.tracks) self.rewrite() msg.append('{} got entire gpx.tracks from {}'.format(self, other)) changed_point_times = 0 self_points = self.point_list()[shorter_at:] for self_point, other_point in zip(self_points, other.points()): # TODO: unittest with shorter track if not self_point.time: if not dry_run: self_point.time = other_point.time changed_point_times += 1 if changed_point_times: if not dry_run: self.rewrite() msg.append('Copied times for {} out of {} points'.format( changed_point_times, self.gpx.get_track_points_no())) return msg
[docs] def merge( # noqa pylint: disable=unused-argument self, other, remove: bool = False, dry_run: bool = False, copy: bool = False, partial_tracks: bool = False) ->list: """Merge other track into this one. Either the track points must be identical or the other track may only contain waypoints. If merging is not possible, raise Track.CannotMerge. If either is public, the result is public. If self.title seems like a default and other.title does not, use other.title Combine description and keywords. Merge waypoints as defined by _merge_waypoints(). Args: other (:class:`~gpxity.track.Track`): The track to be merged remove: After merging succeeded, remove other dry_run: if True, do not really apply the merge copy: This argument is ignored. It is only here to give Track.merge() and Backend.merge() the same interface. partial_tracks: merges other track if either track is part of the other one Returns: list(str) Messages about what has been done. """ assert isinstance(other, Track) msg = [] shorter_at, _ = self.can_merge(other, partial_tracks) if shorter_at is None: raise Track.CannotMerge(_) with self.batch_changes(): msg.extend(self.__merge_tracks(other, dry_run, shorter_at)) msg.extend(self.__merge_waypoints(other, dry_run)) msg.extend(self.__merge_metadata(other, dry_run)) if msg: msg = [' ' + x for x in msg] msg.insert(0, 'merge{} {!r}'.format(' and remove' if remove else '', other)) msg.insert(1, '{} into {!r}'.format(' ' * len(' and remove') if remove else '', self)) if remove: if len(msg) <= 2: msg.append( 'remove duplicate {!r}: It was identical with {!r}'.format(other, self)) if not dry_run: other.remove() return msg
@staticmethod def __time_diff(last_point, point): """Return difference in seconds, ignoring the date.""" result = abs(last_point.hhmmss - point.hhmmss) if result > 33200: # seconds in 12 hours result = 86400 - result return result @staticmethod def __point_is_near(last_point, point, delta_meter): """Return True if distance < delta_meter.""" return abs(last_point.distance_2d(point)) < delta_meter
[docs] def fix(self, orux: bool = False, jumps: bool = False): """Fix bugs. This may fix them or produce more bugs. Please backup your track before doing this. Args: orux: Older Oruxmaps switched the day back by one day after exactly 24 hours. jumps: Whenever the time jumps back or more than 30 minutes into the future, split the segment at that point. Returns: A list of message strings, usable for verbose output. """ self._load_full() if orux: self.__fix_orux() if jumps: self.__fix_jumps() return []
def __fix_jumps(self): # noqa pylint: disable=too-many-branches """Split segments at jumps. Whenever the time jumps back or more than 30 minutes into the future or the distance exceeds 5km, split the segment at that point.""" did_break = False new_tracks = list() for track in self.gpx.tracks: new_segments = list() for segment in track.segments: if not segment.points: did_break = True # sort of - but also needs a rewrite continue new_segment = GPXTrackSegment() new_segment.points.append(segment.points[0]) for point in segment.points[1:]: prev_point = new_segment.points[-1] needs_break = False if point.time is None and prev_point.time is not None: needs_break = True elif point.time is None and prev_point.time is None: if point.distance_2d(prev_point) > 5000: needs_break = True elif point.time is not None and prev_point.time is None: needs_break = True elif point.time - prev_point.time > datetime.timedelta(minutes=30): needs_break = True elif point.time < prev_point.time: needs_break = True elif point.distance_2d(prev_point) > 5000: needs_break = True if needs_break: did_break = True new_segments.append(new_segment) new_segment = GPXTrackSegment() new_segment.points.append(point) new_segments.append(new_segment) new_track = GPXTrack() new_track.segments.extend(new_segments) new_tracks.append(new_track) if did_break: self.gpx.tracks = new_tracks self._dirty = 'gpx' def __fix_orux(self): """Try to fix Oruxmaps problems. 1. the 24h bugs """ all_points = list(uniq(self.points())) for _ in all_points: _.hhmmss = _.time.hour * 3600.0 + _.time.minute * 60 _.hhmmss += _.time.second + _.time.microsecond / 1000000 new_points = list([all_points.pop(0)]) while all_points: last_point = new_points[-1] near_points = [x for x in all_points if self.__point_is_near(last_point, x, 10000)] if not near_points: near_points = all_points[:] nearest = min(near_points, key=lambda x: Track.__time_diff(last_point, x)) new_points.append(nearest) all_points.remove(nearest) day_offset = 0 point1 = None for point in new_points: if point1 is None: point1 = point else: point2 = point if point1.time - point2.time > datetime.timedelta(days=day_offset, hours=23): day_offset += 1 if day_offset: point2.time += datetime.timedelta(days=day_offset) point1 = point2 segment = GPXTrackSegment() segment.points.extend(new_points) self.__gpx.tracks = list() self.__gpx.tracks.append(GPXTrack()) self.__gpx.tracks[0].segments.append(segment) self._dirty = 'gpx' def __similarity_to(self, other): """Return a float 0..1: 1 is identity.""" def simple(track): """Simplified track""" return [(round(x.latitude, 3), round(x.longitude, 3)) for x in simplify_polyline(list(track.points()), max_distance=50)] if id(other) not in self._similarity_others: simple1 = simple(self) simple2 = simple(other) max_len = max([len(simple1), len(simple2)]) similar_length = 1.0 - abs(len(simple1) - len(simple2)) / max_len set1 = set(simple1) set2 = set(simple2) min_len = min([len(set1), len(set2)]) similar_points = len(set1 & set2) / min_len result = similar_length * similar_points self._similarity_others[id(other)] = other self._similarities[id(other)] = result other._similarity_others[id(self)] = self other._similarities[id(self)] = result return self._similarities[id(other)]
[docs] def similarity(self, others): """Return a float 0..1: 1 is identity. The highest value for others is returned.""" if isinstance(others, Track): others = [others] return max(self.__similarity_to(x) for x in others)
[docs] def time_offset(self, other): """If time and last_time have the same offset between both tracks, return that time difference. Otherwise return None.""" def offset(point1, point2): """Returns the time delta if both points have a time.""" if point1.time and point2.time: return point2.time - point1.time return None start_time_delta = offset(next(self.points()), next(other.points())) if start_time_delta: end_time_delta = offset(self.last_point(), other.last_point()) if start_time_delta == end_time_delta: return start_time_delta return None
@property def ids(self): """Return ids for all backends where this track has already been. This is a list of pairs. pair[0] is the name of the backend, pair[1] is the track id within. You can modify it but your changes will never be saved. They are sorted by ascending age. Only the 5 youngest are kept. If the same filename appears in more than one directory, keep only the youngest. Returns: list( (str, str)) a list of tuple pairs with str(backend) and id_in_backend """ if 'ids' in self._header_data: result = self._header_data['ids'] else: self._load_full() result = self._ids return self.__clean_ids(result) @staticmethod def __clean_ids(original): """Remove redundancies and old ids.append. 1. if the same id_in_backend is in several directories, keep only the first one 2. keep only 5 Returns: The cleaned list if ids """ result = list() seen = set() for _ in original: is_dir = ':' not in _[0] if is_dir: if _[1] not in seen: seen.add(_[1]) result.append(_) else: result.append(_) return result[:5]
[docs] def split(self): """Create separate tracks for every track/segment.""" backend = self.backend self.remove() try: for segment in self.segments(): track = self.clone() gpx_track = GPXTrack() gpx_track.segments.append(segment) track.gpx.tracks = [gpx_track] backend.add(track) except BaseException as exc: logging.error('split:%s', exc) backend.add(self)