#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2019 Wolfgang Rohdewald <wolfgang@rohdewald.de>
# See LICENSE for details.
"""This module defines :class:`~gpxity.track.GpxFile`."""
# pylint: disable=protected-access
import datetime
from functools import total_ordering
from contextlib import contextmanager
import weakref
from copy import deepcopy
import logging
# pylint: disable=too-many-lines
from gpxpy import gpx as mod_gpx
from gpxpy.geo import simplify_polyline
from .gpx import Gpx
from .backend_base import BackendBase
from .util import repr_timespan
GPXTrack = mod_gpx.GPXTrack
GPXTrackSegment = mod_gpx.GPXTrackSegment
GPXXMLSyntaxException = mod_gpx.GPXXMLSyntaxException
__all__ = ['GpxFile']
[docs]@total_ordering
class GpxFile: # pylint: disable=too-many-public-methods
"""Represents a file with Gpx data.
If a :class:`~gpxity.backend.Backend` supports attributes not directly
supported by the GPX format like the MapMyTracks track type, they will
transparently be encodeded in existing GPX fields like keywords, see :attr:`keywords`.
If a :class:`~gpxity.backend.Backend` (like :class:`~gpxity.backends.wptrackserver.WPTrackserver`)
does not support keywords, the will transparently be encoded in the description.
The GPX part is done by https://github.com/tkrajina/gpxpy.
If a gpxfile is assigned to a backend, all changes will by default be written directly to the backend.
Some backends are able to change only one attribute with little time overhead, others always have
to rewrite the entire gpxfile.
You can use the context manager :meth:`batch_changes`. This holds back updating the backend until
leaving the context.
If you manipulate the gpx directly, this goes unnoticed to the updating mechanism. Use :meth:`rewrite` when done.
Not all backends support everything, you could get the exception NotImplementedError.
The data will only be loaded from the backend when it is needed. Backends have two ways
of loading data: Either load a list of gpxfiles or load all information about a specific gpxfile. Often
loading the list of gpxfiles gives us some attributes for free, so listing
those gpxfiles may be much faster if you do not want everything listed.
Absolutely all attributes (like :attr:`title`, :attr:`distance`) are encoded in :attr:`gpx`.
However you can always assign values to them even if :attr:`gpx` is None. As soon
as :attr:`gpx` is given, it will be updated.
All points are always rounded to 6 decimal digits when they are added to the track.
However some backends may support less than 6 decimals. You can query Backend.point_precision.
Args:
gpx (Gpx): Initial content. Can be used if you create a new GpxFile from scratch without
loading it from some backend.
Attributes:
categories (tuple(str)): The legal values for :attr:`~GpxFile.category`. The first one is used
as default value. This is a superset of the values for the different backends.
Every backend maps from its internal values into those when reading and maps them
back when writing. Since not all backends support all values defined here and since
some backends may define more values than we know, information may get lost when
converting.
"""
# pylint: disable = too-many-instance-attributes,too-many-public-methods
[docs] class CannotMerge(Exception):
"""Is raised if :meth:`GpxFile.merge() <gpxity.track.GpxFile.merge>` fails."""
categories = (
'Cycling', 'Cycling - Road', 'Cycling - Gravel', 'Cycling - MTB', 'Cycling - Indoor', 'Cycling - Hand',
'Cycling - Touring', 'Cycling - Foot',
'Running', 'Running - Trail', 'Running - Urban Trail', 'Running - Road',
'Sailing', 'Walking', 'Hiking', 'Hiking - Speed',
'Swimming', 'Driving', 'Off road driving', 'Motor racing', 'Motorcycling', 'Enduro',
'Skiing', 'Skiing - Touring', 'Skiing - Backcountry', 'Skiing - Crosscountry', 'Skiing - Nordic',
'Skiing - Alpine', 'Skiing - Roller',
'Canoeing', 'Kayaking', 'Sea kayaking', 'Stand up paddle boarding',
'Rowing', 'Windsurfing', 'Kiteboarding', 'Orienteering', 'Mountaineering', 'Skating',
'Skateboarding', 'Horse riding', 'Hang gliding', 'Gliding', 'Flying', 'Snowboarding',
'Paragliding', 'Hot air ballooning', 'Nordic walking', 'Snowshoeing', 'Jet skiing', 'Powerboating',
'Swimrun',
'Pedelec', 'Crossskating', 'Motorhome', 'Cabriolet', 'Coach',
'Pack animal trekking', 'Train', 'Wheelchair', 'Sightseeing', 'Geocaching', 'Longboard',
'River navigation',
'Skating - Inline',
'Wintersports',
'Miscellaneous')
_obsolete_categories = {
'Mountain biking': 'Cycling - MTB',
}
def __init__(self, gpx=None):
"""See class docstring."""
if gpx is None:
gpx = Gpx()
assert isinstance(gpx, Gpx)
self.__dirty = list()
self._batch_changes = False
self.__ids = list() # TODO: remove
self.__id_in_backend = None
self.__backend = None
self.__gpx = None
self.__backend = None
self.__cached_time = None
self.__cached_distance = None
# those are read from the backend but writing them to the same
# backend would fence them away. We want to avoid simple
# in-place operations removing positions.
self._illegal_points = None
# __header_cache holds all attributes that are set while gpx is None.
# After gpx is given, if an attribute is not set in gpx, remove it from __header_cache
# and write it into gpx. Do that for all values from __header_cache before writing
# to the backend.
self.__header_cache = dict()
self._similarity_others = weakref.WeakValueDictionary() # other Gpxfiles pointing back to us
self._similarities = dict()
self.__without_fences = None # used by context manager "fenced()"
self.gpx = gpx
def __decode_gpx(self):
"""Extract attributes from gpx.
This is done whenever gpx changes.
We have 3 groups of attributes depending on how often
they are needed an how expensive their computation is,
in ascending order:
1. always read from and write to gpx. Examples: title, description
2. cache now. Examples: category ??? Wirklich???? Oder nur 1. und 3.?
3. lazy: compute and cache only when needed. Examples: distance, time
Those are set to None here.
If an attribute is changed, the full gpxfile is always loaded first.
"""
self.__gpx.decode()
# lazy attributes:
self.__cached_distance = None
self.__cached_time = None
self._clear_similarity_cache()
def __encode_gpx(self):
"""Put values into gpx. See __decode_gpx."""
self.__gpx.encode()
if self.backend:
self.__gpx.category = self.backend.encode_category(self.__gpx.category)
@property
def backend(self):
"""The backend this gpxfile lives in. If the gpxfile was constructed in memory, backend is None.
This is a read-only property. It is set with :meth:`Backend.add <gpxity.backend.Backend.add>`.
It is not possible to decouple a gpxfile from its backend, use :meth:`clone()`.
Returns:
The backend
"""
return self.__backend
# :attr:`GpxFile.id_in_backend <gpxity.gpxfile.GpxFile.id_in_backend>`.
@property
def id_in_backend(self) ->str:
"""Every backend has its own scheme for unique ids.
Some backends may change this if the gpxfile data changes.
Some backends support assigning a new value. Those are
currently :class:`~gpxity.backends.directory.Directory` and
:class:`~gpxity.backends.wptrackserver.WPTrackserver`.
The others will raise NotImplementedError.
See also :meth:`Backend.add() <gpxity.backend.Backend.add>`.
Returns:
the id in the backend
"""
return self.__id_in_backend
@id_in_backend.setter
def id_in_backend(self, value: str) ->None:
"""Change the id in the backend.
Illegal changes raise ValueError.
Args:
value: The new value
"""
BackendBase._check_id_legal(value)
if self.__backend:
self.__backend._check_id_legal(value)
if self.__id_in_backend == value:
return
if self.__id_in_backend:
self.__ids.insert(0, str(self))
if self.__is_decoupled:
# internal use
self.__id_in_backend = value
else:
if not self.__id_in_backend:
raise ValueError('Cannot set id_in_backend for yet unsaved gpxfile {}'.format(self))
if not value:
raise ValueError('Cannot remove id_in_backend for saved gpxfile {}'.format(self))
with self._decouple():
self.backend._change_ident(self, value)
def _set_backend(self, value):
"""To be used only by backend implementations."""
assert self.__is_decoupled
assert value is not self.__backend
if self.__backend:
self.__backend._check_id_legal(self.id_in_backend)
self._illegal_points = 0 # when copying or moving a GpxFile, it is OK to lose points
old_backend = self.__backend
self.__backend = value
if self.__gpx.keywords:
if old_backend and old_backend.__class__ != value.__class__:
# encode keywords for the new backend
# TODO: unittest
self.__gpx.encode()
self.change_keywords(self.__gpx.real_keywords)
self.__gpx.default_country = self.backend.account.country if self.backend else None
[docs] def rewrite(self) ->None:
"""Call this after you directly manipulated :attr:`gpx`."""
if not self.__gpx.is_complete:
raise Exception('GpxFile.rewrite: The gpxfile must already be loaded fully')
self._dirty = 'gpx'
@property
def _dirty(self) ->list:
"""
Check if the gpxfile is in sync with the backend.
Setting :attr:`_dirty` will directly write the changed data into the backend.
:attr:`_dirty` can receive an arbitrary string like 'title'. If the backend
has a method _write_title, that one will be called. Otherwise the
entire gpxfile will be written by the backend.
Returns:
list: The names of the attributes currently marked as dirty.
"""
return self.__dirty
@_dirty.setter
def _dirty(self, value):
"""See dirty.getter."""
if not isinstance(value, str):
raise Exception('_dirty only receives str')
if self._illegal_points:
raise Exception(
'Cannot modify {}: Fences removed {} points while reading from backend. '
'Use gpxdo fix --refence'.format(self, self._illegal_points))
if value == 'gpx':
if self.__without_fences is not None:
raise Exception(
'{}: You may not modify gpx while being in the context manager "fenced()"'.format(self))
self.__decode_gpx()
if not self.__is_decoupled:
self.__dirty.append(value)
if not self._batch_changes:
self._rewrite()
def _clear_similarity_cache(self):
"""Clear similarities cache, also in the other similar gpxfiles."""
for other in list(self._similarity_others.values()):
if id(self) in other._similarity_others:
del other._similarity_others[id(self)]
# TODO: unittest where other does not exist anymore
if id(self) in other._similarities:
del other._similarities[id(self)]
self._similarity_others = weakref.WeakValueDictionary()
self._similarities = dict()
def _clear_dirty(self):
"""To be used by the backend when saving."""
self.__dirty = list()
[docs] def clone(self):
"""Create a new gpxfile with the same content but without backend.
Returns:
~gpxity.gpxfile.GpxFile: the new gpxfile
"""
self._load_full()
result = GpxFile(gpx=self.gpx.clone())
if self.backend:
result.__ids.insert(0, str(self))
return result
def _rewrite(self):
"""Rewrite all changes in the associated backend.
If any of those conditions is met, do nothing:
- we are currently loading from backend: Avoid recursion
- batch_changes is active
- we have no backend
Otherwise the backend will save this gpxfile.
"""
if 'gpx' in self.__dirty:
self.__decode_gpx()
if not self.backend:
self._clear_dirty()
if not self.__dirty:
return
if 'write' not in self.backend.supported:
# TODO: unittest
raise Exception('Rewriting {}: "write" is not supported'.format(self))
if not self.__is_decoupled and not self._batch_changes:
with self._decouple():
old_gpx_keywords = self.__gpx.keywords
try:
self.__encode_gpx()
self.backend._rewrite(self, self.__dirty)
finally:
self.__gpx.keywords = old_gpx_keywords
self._clear_dirty()
[docs] def remove(self):
"""Remove this gpxfile in the associated backend.
If the gpxfile is not coupled with a backend, raise an Exception.
"""
if not self.backend:
raise Exception('{}: Removing needs a backend'.format(self))
self.backend.remove(self.id_in_backend)
@property
def first_time(self) ->datetime.datetime:
"""datetime.datetime: start time of gpxfile in UTC.
For a simpler implementation of backends, notably :class:`~gpxity.backends.mmt.MMT`
we ignore gpx.time. Instead we return the time of the earliest track point.
Only if there is no track point, return gpx.time. If that is unknown
too, return None.
For the same reason time is readonly.
We assume that the first point comes first in time and the last
point comes last in time. In other words, points should be ordered
by their time.
"""
if self.__cached_time is None:
self.__cached_time = self.__gpx.first_time
if self.__cached_time is None:
self.__cached_time = self.gpx.first_time
return self.__cached_time
@property
def distance(self) ->float:
"""For me, the earth is flat.
This property can only be set while the full gpxfile has not yet
been loaded. The setter is used by the backends when scanning for all gpxfiles.
Returns:
the distance in km, rounded to m. 0.0 if not computable.
"""
if self.__cached_distance is None:
self.__cached_distance = self.gpx.distance
return self.__cached_distance
@distance.setter
def distance(self, value):
"""The setter."""
if self.__gpx.is_complete:
raise Exception('Setting GpxFile.distance is only allowed while the full gpxfile has not yet been loaded')
self.__cached_distance = value
@property
def title(self) -> str:
"""str: The title.
Returns:
the title
"""
if self.__gpx.name == Gpx.undefined_str:
self._load_full()
if self.__gpx.name == Gpx.undefined_str:
self.__gpx.name = ''
return self.__gpx.name
@title.setter
def title(self, value: str):
"""see getter."""
if self.__gpx.name != value:
self._load_full()
self.__gpx.name = value
self._dirty = 'title'
@property
def description(self) ->str:
"""str: The description.
Returns:
The description
"""
if self.__gpx.description == Gpx.undefined_str:
self._load_full()
if self.__gpx.description == Gpx.undefined_str:
self.__gpx.description = ''
return self.__gpx.description
@description.setter
def description(self, value: str):
"""see getter."""
if self.__gpx.description != value:
self._load_full()
self.__gpx.description = value
self._dirty = 'description'
[docs] @contextmanager
def fenced(self, fences=None):
# TODO: fences=None uses self.backend.account.fences
"""Suppress points in fences.
While this context manager is running, suppressed points are
not visible.
"""
if fences is None:
fences = self.backend.account.fences
if not fences:
yield
return
if self.__without_fences is not None:
raise Exception('fenced() is already active')
without_fences = dict()
try:
old_points = self.__gpx.get_track_points_no()
old_illegals = self._illegal_points
self._illegal_points = 0
for track_idx, track in enumerate(self.__gpx.tracks):
for seg_idx, segment in enumerate(track.segments):
without_fences[(track_idx, seg_idx)] = segment.points
segment.points = [x for x in segment.points if fences.outside(x)]
all_waypoints = self.__gpx.waypoints
self.__gpx.waypoints = [x for x in self.__gpx.waypoints if fences.outside(x)]
self._clear_similarity_cache()
new_points = self.__gpx.get_track_points_no()
if new_points < old_points:
logging.info('%s: Fencing removed %s points', self, old_points - new_points)
yield
finally:
for (track_idx, seg_idx), points in without_fences.items():
self.__gpx.tracks[track_idx].segments[seg_idx].points = points
self.__gpx.waypoints = all_waypoints
self._clear_similarity_cache()
self.__without_fences = None
self._illegal_points = old_illegals
@contextmanager
def _decouple(self):
"""Context manager: disable automic synchronization with the backend.
In that state, automatic writes of changes into
the backend are disabled, and if you access attributes which
would normally trigger a full load from the backend, they will not.
(The latter is used by __str__ and __repr__).
"""
from_backend = self.__backend
prev_value = from_backend._decoupled if from_backend else None
if from_backend:
from_backend._decoupled = True
try:
yield
finally:
if from_backend:
from_backend._decoupled = prev_value
@property
def __is_decoupled(self):
"""True if we are currently decoupled from the backend.
In that state, changes to GpxFile are not written to the backend and
the gpxfile is not marked dirty.
Returns:
True if we are decoupled
"""
if self.backend:
return self.backend._decoupled
return True
[docs] @contextmanager
def batch_changes(self):
"""Context manager: disable the direct update in the backend and saves the entire gpxfile when done.
This may or may not make things faster.
:class:`~gpxity.backends.directory.Directory` and
:class:`~gpxity.backends.gpsies.GPSIES` profits from this,
:class:`~gpxity.backends.mmt.MMT` maybe.
"""
prev_batch_changes = self._batch_changes
self._batch_changes = True
try:
yield
finally:
self._batch_changes = prev_batch_changes
self._rewrite()
@property
def category(self) ->str:
"""str: What is this gpxfile doing? If we have no current value, return the default.
The value is automatically translated between our internal value and
the value used by the backend. This happens when reading from
or writing to the backend. Here we return always the internal value.
Returns:
The current value or the default value (see :attr:`categories`)
"""
if self.__gpx.category == Gpx.undefined_str:
self._load_full()
if self.__gpx.category == Gpx.undefined_str:
return self.categories[0]
if self.backend:
return self.backend.decode_category(self.__gpx.category)
return self.__gpx.category
def __default_category(self) ->str:
"""The default for either an unsaved gpxfile or for the corresponding backend.
Returns: The category
"""
if self.backend:
return self.backend.decode_category(self.backend.supported_categories[0])
return self.categories[0]
@category.setter
def category(self, value: str):
"""see getter."""
if value is None:
value = self.__default_category()
if value not in self.categories:
raise Exception('Category {} is not known'.format(value))
if value != self.__gpx.category:
self._load_full()
self.__gpx.category = value
self.__encode_gpx()
self._dirty = 'category'
def _load_full(self):
"""Load the full gpxfile from source_backend if not yet loaded and if not decoupled.
The backend may
- add values to self.__gpx
- replace self.__gpx, see gpx.setter
The backend is allowed and expected to replace already known values. This may
happen
- if the backend has a mistake and returns different values in the list of the gpxfile
and in the full downloaded gpxfile
- if somebody else changed the gpxfile in the backend meanwhile
Returns: True for success
"""
if (self.backend and self.id_in_backend and not self.__gpx.is_complete
and not self.__is_decoupled and 'scan' in self.backend.supported): # noqa
self.backend._read_all_decoupled(self)
[docs] def add_points(self, points) ->None:
"""Round and add points to last segment in the last gpxfile.
If no track is allocated yet and points is not an empty list, allocates a track.
Args:
points (list(GPXTrackPoint):
The points to be added
"""
if points:
self._round_points(points)
self.gpx.add_points(points)
self._dirty = 'gpx'
def __decode_category(self, value) -> str:
"""Helper for _decode_keywords.
Returns:
A value out of GpxFile.categories
"""
return self.backend.decode_category(value) if self.backend else value
@staticmethod
def _round_points(points):
"""Round points to 6 decimal digits because some backends may cut last digits.
Gpsies truncates to 7 digits. The points are rounded in place!
Args:
points (list(GPXTrackPoint): The points to be rounded
"""
for _ in points:
_.longitude = round(_.longitude, 6)
_.latitude = round(_.latitude, 6)
[docs] def xml(self) ->str:
"""Produce exactly one line per trackpoint for easier editing (like removal of unwanted points).
Returns: The xml string.
"""
self._load_full()
if self.__is_decoupled:
# if Backend._write_all() calls this, everything is already encoded in __gpx
logging.info('Gpxfile.xml with encode: has ids %s in keywords %s', self.__gpx.ids, self.__gpx.keywords)
self.__encode_gpx()
else:
logging.info('Gpxfile.xml() without encode: has ids %s in keywords %s',
self.__gpx.ids, self.__gpx.keywords)
return self.__gpx.xml()
@property
def public(self):
"""
bool: Is this a private gpxfile (can only be seen by the account holder) or is it public?.
Default value is False
Returns:
True if gpxfile is public, False if it is private
"""
if self.__gpx.keywords == Gpx.undefined_str:
self._load_full()
if self.__gpx.public == Gpx.undefined_str:
return False
return self.__gpx.public
@public.setter
def public(self, value):
"""Store this flag as keyword 'public'."""
if value not in (True, False):
raise ValueError('public must be True or False, I got {}'.format(value))
if value != self.__gpx.public:
self._load_full()
self.__gpx.public = value
self.__gpx.encode() # TODO: ist das richtig?
self._dirty = 'public'
@property
def gpx(self) ->Gpx:
"""
Direct access to the Gpx object.
If you use it to change its content
* remember to call :meth:`rewrite` afterwards.
* Since everything is stored in Gpx, all attributes like
:attr:`description`, ;attr:`public` ... will change too.
Returns:
the Gpx object
"""
self._load_full()
return self.__gpx
@gpx.setter
def gpx(self, value):
"""Assign new gpx."""
self.__gpx = value
self._clear_similarity_cache()
if self.__is_decoupled:
self.__decode_gpx()
else:
self.__encode_gpx()
self._round_points(self.points())
@property
def last_time(self) ->datetime.datetime:
"""The last time we received in UTC.
Returns:
The last time we received so far. If none, return None.
"""
return self.gpx.last_time
@property
def keywords(self):
"""list(str): represent them as a sorted list - in GPX they are comma separated.
Content is whatever you want.
Because the GPX format does not have attributes for everything used by all backends,
we encode some of the backend arguments in keywords.
Example for mapmytracks: keywords = 'Status:public, Category:Cycling'.
Gpxity expects keywords to be separated by ",". When writing them Gpxity
uses ", " (with a space after the comma) as separator.
However this is transparent for you. When parsing theGPX file, those are removed
from keywords, and the are re-added in when exporting in :meth:`xml`. So
:attr:`GpxFile.keywords` will never show those special values.
Some backends may change keywords. :class:`~gpxity.backends.mmt.MMT` converts the
first character into upper case and will return it like that. Gpxity will not try to hide such
problems. So if you save a gpxfile in :class:`~gpxity.backends.mmt.MMT`, its keywords
will change. But they will not change if you copy from :class:`~gpxity.backends.mmt.MMT`
to :class:`~gpxity.backends.directory.Directory` - so if you copy from DirectoryA
to :class:`~gpxity.backends.mmt.MMT` to DirectoryB, the keywords in
DirectoryA and DirectoryB will not be identical, for example "berlin" in DirectoryA but
"Berlin" in DirectoryB.
"""
return self.gpx.real_keywords
@keywords.setter
def keywords(self, values):
"""Replace all keywords.
Args:
Either single str with one or more keywords, separated by commas
or an iterable of keywords. The new keywords. Must not have duplicates.
"""
self.change_keywords(values, replace=True)
@staticmethod
def _check_keyword(keyword):
"""Must not be one of our internally used codes."""
internal = (('Category', 'category'), ('Status', 'public'), ('Id', 'ids'))
for internal_kw, attr in internal:
if keyword.startswith(internal_kw + ':'):
raise Exception('Do not use {} directly, use GpxFile.{}'.format(internal_kw, attr))
if ',' in keyword:
raise Exception('No comma allowed within a keyword')
def __prepare_keywords(self, values):
"""Common introductory code for change_keywords.
The values may be preceded with a '-' which will be preserved in the result.__ids
Args:
values: Either single str with one or more keywords, separated by commas
or an iterable of keywords.
Returns:
A set of legal keywords as expected by the backend.
"""
if isinstance(values, str):
lst = [x.strip() for x in values.split(',')]
else:
lst = list(values)
pairs = list()
for _ in lst:
if _.startswith('-'):
pairs.append((False, _[1:]))
else:
pairs.append((True, _))
for _ in pairs:
self._check_keyword(_[1])
if self.backend:
pairs = [(x[0], self.backend._encode_keyword(x[1])) for x in pairs]
adding = {x[1] for x in pairs if x[0]}
removing = {x[1] for x in pairs if not x[0]}
return adding, removing
[docs] def change_keywords(self, values, replace=False, dry_run=False):
"""Change keywords.
Duplicate keywords are silently ignored.
A keyword may not contain a comma.
Keywords with a preceding '-' are removed, the others are added.
Raise an Exception if a keyword is both added and removed.
Args:
values: Either a single str with one or more keywords, separated by commas
or an iterable of keywords
replace: if True, replace current keywords with the new ones. Ignores keywords
preceded with a '-'.
dry_run: if True, only return the new keywords but do not make that change
Returns:
The new keywords
"""
self._load_full()
have = {*self.keywords}
add, remove = self.__prepare_keywords(values)
if add & remove:
raise Exception('Cannot both add and remove keywords {}'.format(add & remove))
if replace:
remove = have - add
else:
add -= have
remove &= have
new = sorted((have | add) - remove)
if new == self.keywords:
return self.keywords
logging.info('%s: Keywords: %s -> %s', self, self.keywords, new)
if not dry_run:
self.__gpx.real_keywords = new
with self.batch_changes():
if remove:
self._dirty = 'remove_keywords{}{}'.format(BackendBase._dirty_separator, ', '.join(remove))
if add:
self._dirty = 'add_keywords{}{}'.format(BackendBase._dirty_separator, ', '.join(add))
assert new == self.keywords, (
'change_keywords failed. Expected: {}, got: {}'.format(new, self.keywords))
return new
[docs] def speed(self) ->float:
"""Speed over the entire time in km/h or 0.0.
Returns:
The speed
"""
return self.gpx.speed()
[docs] def moving_speed(self) ->float:
"""Speed for time in motion in km/h.
Returns:
The moving speed
"""
return self.gpx.moving_speed()
[docs] def warnings(self):
"""Return a list of strings with easy to find problems."""
result = list()
if self.last_time:
speed = self.speed()
moving_speed = self.moving_speed()
if speed > moving_speed:
result.append('Speed {:.3f} must not be above Moving speed {:.3f}'.format(
speed, moving_speed))
expected_speeds = {
'Cycling - MTB': ((3, 50), (5, 50)),
'Cycling - Road': ((3, 60), (10, 60)),
}
expected_speed = expected_speeds.get(self.category)
if expected_speed:
template = None
if speed < expected_speed[0][0]:
template = 'Speed {speed:.3f} is very low'
elif speed > expected_speed[0][1]:
template = 'Speed {speed:.3f} is very high'
elif moving_speed < expected_speed[1][0]:
template = 'Moving speed {moving_speed:.3f} is very low'
elif moving_speed > expected_speed[1][1]:
template = 'Moving speed {moving_speed:.3f} is very high'
if template:
result.append(template.format(speed=speed, moving_speed=moving_speed))
return result
def __repr__(self) ->str:
"""The repr.
Returns:
the repr str
"""
with self._decouple():
# this should not automatically load the entire gpxfile
parts = []
parts.append('public' if self.public else 'private')
if self.__gpx:
parts.append(self.category)
if self.keywords:
parts.append(','.join(self.keywords))
if self.title:
parts.append(self.title)
if self.first_time and self.last_time:
parts.append(repr_timespan(self.first_time, self.last_time))
elif self.first_time:
parts.append(str(self.first_time))
if self.distance:
parts.append('{:4.2f}km'.format(self.distance))
return '{}({})'.format(str(self), ' '.join(parts))
[docs] @staticmethod
def identifier(backend, ident: str) ->str:
"""The full identifier for a gpxfile.
Since we may want to do this without instantiating a gpxfile,
this must be staticmethod or classmethod.
str(gpxfile) uses this. However if a gpxfile has no id_in_backend,
str(gpxfile will create one using title, time, id(gpxfile).
Args:
backend: May be :class:`~gpxity.backend.Backend` or :class:`~gpxity.accounts.Account`
ident: id_in_backend
Returns:
the full identifier.
"""
if isinstance(backend, BackendBase):
account = backend.account
else:
account = backend
if account is None:
return 'unsaved: "{}"'.format(ident)
if not ident:
ident = 'no id_in_backend'
return str(account) + ident
def __str__(self) ->str:
"""The str.
Returns:
a unique full identifier
"""
ident = self.id_in_backend
if not ident:
ident = '"{}" time={} id={}'.format(self.title or 'untitled', self.first_time, id(self))
return self.identifier(self.backend, ident)
[docs] def key(self, with_category: bool = True, with_last_time: bool = True, precision=None) ->str:
"""For speed optimized equality checks, not granted to be exact, but sufficiently safe IMHO.
Args:
with_category: If False, do not use self.category. Needed for comparing
gpxfiles for equality like in unittests because values can change
and information can get lost while copying between different
backends
with_last_time: If False, do not use self.last_time.
precision: For latitude/longitude. After comma digits. Default is as defined by backend or 6.
Returns:
a string with selected attributes in printable form.
"""
self._load_full()
return 'title:{} description:{} keywords:{} category:{}: public:{} last_time:{} angle:{} points:{}'.format(
self.title, self.description,
','.join(self.keywords).lower(), self.category if with_category else '',
self.public, self.last_time if with_last_time else '',
self.angle(precision=precision), self.gpx.get_track_points_no())
def __eq__(self, other) ->bool:
"""equal.
Returns:
result
"""
if self is other:
return True
if not self.points_equal(other, digits=5):
return False
return self.key() == other.key()
def __lt__(self, other) ->bool:
"""less than.
Returns:
result
"""
return self.key() < other.key()
[docs] def angle(self, first_point=None, last_point=None, precision=None) ->float:
"""For me, the earth is flat.
Args:
first_point: if None, first point of GpxFile
last_point: if None, last point of GpxFile
precision: After comma digits. Default is as defined by backend or 6.
Returns:
the angle in degrees 0..360 between start and end.
If we have no two points, return 0
"""
return self.gpx.angle(first_point=first_point, last_point=last_point, precision=precision)
[docs] def segments(self):
"""
A generator over all segments.
Yields:
GPXTrackSegment: all segments in all tracks
"""
for _ in self.gpx.segments():
yield _
[docs] def points(self):
"""
A generator over all points.
Yields:
GPXTrackPoint: all points in all tracks and segments
"""
for _ in self.gpx.points():
yield _
[docs] def point_list(self):
"""A flat list with all points.
Returns:
The list
"""
return sum((x.points for x in self.segments()), [])
[docs] def last_point(self):
"""Return the last point of the track. None if none."""
# TODO: unittest for track without __gpx or without points
return self.gpx.last_point()
[docs] def adjust_time(self, delta):
"""Add a timedelta to all times."""
self.gpx.adjust_time(delta)
self._dirty = 'gpx'
[docs] def points_hash(self) -> float:
"""A hash that is hopefully different for every possible track.
It is built using the combination of all points.
Returns:
The hash
"""
return self.gpx.points_hash()
[docs] def points_equal(self, other, digits=4) ->bool:
"""
Compare points for same position.
Args:
digits: Number of after comma digits to compare
Returns:
True if both gpxfiles have identical points.
All points of all gpxfiles and segments are combined. Elevations are ignored.
"""
return self.gpx.points_equal(other.gpx, digits)
[docs] def index(self, other, digits=4):
"""Check if this gpxfile contains other gpxfile.
This only works if all values for latitude and longitude are
nearly identical.
Useful if one of the gpxfiles had geofencing applied.
Args:
digits: How many after point digits are used
Returns:
None or the starting index for other.points in self.points
"""
return self.gpx.index(other.gpx, digits)
[docs] @staticmethod
def overlapping_times(gpxfiles):
"""Find gpxfiles with overlapping times.
Yields:
groups of gpxfiles with overlapping times. Sorted by time.
This may be very slow for many long gpxfiles.
"""
previous = None
group = list() # GpxFile is mutable, so a set is no possible
for current in sorted(gpxfiles, key=lambda x: x.first_time):
if previous and current.first_time <= previous.last_time:
if previous not in group:
group.append(previous)
group.append(current)
else:
if group:
yield sorted(group, key=lambda x: x.first_time)
group = list()
previous = current
if group:
yield sorted(group, key=lambda x: x.first_time)
def _has_default_title(self) ->bool:
"""Try to check if gpxfile has the default title given by a backend.
Returns:
True if so"""
# the title of MMT might have been copied into another backend:
if not self.title:
return True
if self.title == '{} track'.format(self.category):
return True
if all(x in '0123456789 :-_' for x in self.title):
return True
return False
def __merge_metadata(self, other, dry_run):
"""Merge metadata from other.
Used only by self.merge.
Returns:
a list of verbosity messages
"""
msg = list()
if not other._has_default_title() and self._has_default_title():
msg.append('Title: {} -> {}'.format(self.title, other.title))
if not dry_run:
self.title = other.title
if other.description and other.description != self.description:
msg.append('Additional description: {}'.format(
other.description))
if not dry_run:
self.description += '\n'
self.description += other.description
if other.public and not self.public:
msg.append('Visibility: private -> public')
if not dry_run:
self.public = True
if other.category != self.category:
msg.append('Category: {}={} wins over {}={}'.format(
other, other.category, self, self.category))
kw_src = set(other.keywords)
kw_dst = set(self.keywords)
if kw_src - kw_dst:
msg.append('New keywords: {}'.format(','.join(kw_src - kw_dst)))
if not dry_run:
self.keywords = kw_src | kw_dst
# ids are diffent. keywords are sorted, ids is fifo
new_ids = self.__clean_ids(self.ids + other.ids)
if new_ids != self.ids:
msg.append('New Ids: {}'.format(','.join(set(new_ids) - set(self.ids))))
if not dry_run:
self.ids = new_ids
return msg
[docs] def can_merge(self, other, partial: bool = False):
"""Check if self and other are mergeable.
args:
other: The other GpxFile
partial: If True, they are mergeable if one of them contains
the other one.
Returns: (int, str)
int is either None or the starting index of the shorter gpxfile in the longer gpxfile
str is either None or a string explaing why this is not mergeable
"""
if str(self) == str(other):
return None, 'Cannot merge identical gpxfiles {}'.format(self)
if (other.gpx.get_track_points_no()) == 0 and other.gpx.waypoints:
# mergable
return 0, None
if partial:
other_in_self = self.index(other)
if other_in_self is not None:
return other_in_self, None
self_in_other = other.index(self)
if self_in_other is not None:
return self_in_other, None
elif self.points_equal(other):
return 0, None
return None, (
'Cannot merge {} with {} points into {} with {} points'.format(
other, other.gpx.get_track_points_no(),
self, self.gpx.get_track_points_no()))
def __merge_waypoints(self, other, dry_run) ->list:
"""Merge waypoints from other with differing positions.
Returns: list(str)
Messages about what has been done.
"""
# TODO: unittest
def have(wpt):
"""True if we already have this waypoint."""
return (wpt.latitude, wpt.longitude) in [(x.latitude, x.longitude) for x in new_wpts]
merged_count = 0
new_wpts = self.gpx.waypoints[:]
for other_wpt in other.gpx.waypoints:
if not have(other_wpt):
new_wpts.append(other_wpt)
if not dry_run:
self.gpx.waypoints.append(other_wpt)
self.rewrite()
merged_count += 1
if merged_count > 0:
return ['{} got {} waypoints from {}'.format(self, merged_count, other)]
return []
def __merge_gpxfiles(self, other, dry_run, shorter_at) ->list:
"""Merge gpxfiles from other.
Returns: list(str)
Messages about what has been done.
"""
msg = []
if other.gpx.get_track_points_no() > self.gpx.get_track_points_no():
if not dry_run:
self.gpx.tracks = deepcopy(other.gpx.tracks)
self.rewrite()
msg.append('{} got entire gpx.tracks from {}'.format(self, other))
changed_point_times = 0
self_points = self.point_list()[shorter_at:]
for self_point, other_point in zip(self_points, other.points()):
# TODO: unittest with shorter gpxfile
if not self_point.time:
if not dry_run:
self_point.time = other_point.time
changed_point_times += 1
if changed_point_times:
if not dry_run:
self.rewrite()
msg.append('Copied times for {} out of {} points'.format(
changed_point_times, self.gpx.get_track_points_no()))
return msg
[docs] def merge( # noqa pylint: disable=unused-argument
self, other, remove: bool = False, dry_run: bool = False, copy: bool = False,
partial: bool = False) ->list:
"""Merge other gpxfile into this one.
Either the track points must be identical or the other gpxfile
may only contain waypoints.
If merging is not possible, raise GpxFile.CannotMerge.
If either is public, the result is public.
If self.title seems like a default and other.title does not, use other.title
Combine description and keywords.
Merge waypoints as defined by _merge_waypoints().
Args:
other (:class:`~gpxity.gpxfile.GpxFile`): The gpxfile to be merged
remove: After merging succeeded, remove other
dry_run: if True, do not really apply the merge
copy: This argument is ignored. It is only here to give
:meth:`GpxFile.merge() <gpxity.gpxfile.GpxFile.merge>`
and :meth:`Backend.merge() <gpxity.backend.Backend.merge>` the same interface.
partial: merges other gpxfile
if either gpxfile is part of the other one
Returns: list(str)
Messages about what has been done.
"""
assert isinstance(other, GpxFile)
msg = []
shorter_at, _ = self.can_merge(other, partial)
if shorter_at is None:
raise GpxFile.CannotMerge(_)
with self.batch_changes():
msg.extend(self.__merge_gpxfiles(other, dry_run, shorter_at))
msg.extend(self.__merge_waypoints(other, dry_run))
msg.extend(self.__merge_metadata(other, dry_run))
if msg:
msg = [' ' + x for x in msg]
msg.insert(0, 'merge{} {!r}'.format(' and remove' if remove else '', other))
msg.insert(1, '{} into {!r}'.format(' ' * len(' and remove') if remove else '', self))
if remove:
if len(msg) <= 2:
msg.append(
'remove duplicate {!r}: It was identical with {!r}'.format(other, self))
if not dry_run:
other.remove()
return msg
[docs] def fix_orux(self):
"""Older Oruxmaps switched the day back by one day after exactly 24 hours.
Try fixing that.
Please backup your gpxfile before doing this.
Returns: A list with messages. Currently nothing.
"""
if self.gpx.fix_orux():
self._dirty = 'gpx'
return []
[docs] def split_at_stops(self, minutes):
"""Split where things happen.
Whenever the time jumps back or more than X
minutes into the future, split the segment at that point.
minutes: Shortest possible stop for splitting
Returns:
A list of message strings, usable for verbose output.
"""
if self.gpx.fix_jumps(minutes=minutes):
self._dirty = 'gpx'
def __similarity_to(self, other):
"""Return a float 0..1: 1 is identity."""
def simple(gpxfile):
"""Simplified gpxfile"""
return [(round(x.latitude, 3), round(x.longitude, 3))
for x in simplify_polyline(list(gpxfile.points()), max_distance=50)]
if id(other) not in self._similarity_others:
simple1 = simple(self)
simple2 = simple(other)
max_len = max([len(simple1), len(simple2)])
similar_length = 1.0 - abs(len(simple1) - len(simple2)) / max_len
set1 = set(simple1)
set2 = set(simple2)
min_len = min([len(set1), len(set2)])
similar_points = len(set1 & set2) / min_len
result = similar_length * similar_points
self._similarity_others[id(other)] = other
self._similarities[id(other)] = result
other._similarity_others[id(self)] = self
other._similarities[id(self)] = result
return self._similarities[id(other)]
[docs] def similarity(self, others):
"""Return a float 0..1: 1 is identity.
The highest value for others is returned."""
if isinstance(others, GpxFile):
others = [others]
return max(self.__similarity_to(x) for x in others)
[docs] def time_offset(self, other):
"""If time and last_time have the same offset between both gpxfiles, return that time difference.
Otherwise return None."""
return self.gpx.time_offset(other.gpx)
@property
def ids(self):
"""Return ids for all backends where this gpxfile has already been.
You can modify it but your changes will only be saved if and when
the entire gpxfile is saved.
They are sorted by ascending age.
Only the 5 youngest are kept.
If the same id_in_backend appears in more than one directory, keep only the youngest.
Returns: list( (str))
a list of gpxfile ids
"""
if self.__gpx.keywords == Gpx.undefined_str:
self._load_full()
return self.__clean_ids(self.__gpx.ids)
@ids.setter
def ids(self, value):
"""Setter for ids."""
cleaned = self.__clean_ids(value)
if cleaned != self.__gpx.ids:
self._load_full()
self.__gpx.ids = cleaned
self.__gpx.encode()
self._dirty = 'gpx'
@staticmethod
def __clean_ids(original):
"""Remove redundancies and old ids.append.
1. if the same id_in_backend is in several directories, keep only the first one
2. keep only 5
Returns: The cleaned list if ids
"""
result = list()
seen_url = set()
seen_id = set()
for orig_id in original:
if orig_id in seen_id:
continue
seen_id.add(orig_id)
try:
acc, _ = BackendBase.parse_objectname(orig_id)
except KeyError:
continue
if acc.backend == 'Directory':
if acc.url not in seen_url:
seen_url.add(acc.url)
result.append(orig_id)
else:
result.append(orig_id)
result = result[:5]
if result != original:
logging.debug('ids: %s -> %s', original, result)
return result
[docs] def split_segments(self):
"""Create separate gpxfiles for every track/segment."""
backend = self.backend
if not backend:
raise Exception('GpxFile.split_segments() needs a backend')
clone = self.clone()
try:
for track_idx, track in enumerate(clone.gpx.tracks):
for seg_idx, segment in enumerate(track.segments):
gpxfile = self.clone()
gpx_track = GPXTrack()
gpx_track.segments.append(segment)
gpxfile.gpx.tracks = [gpx_track]
gpxfile.id_in_backend = '{} Trk {} Seg {}'.format(self.id_in_backend, track_idx + 1, seg_idx + 1)
backend.add(gpxfile)
except BaseException as exc: # pylint: disable=broad-except
logging.error('split:%s', exc)
backend.add(clone)
[docs] def split_waypoints(self):
"""Split into separate tracks at waypoints.""" # noqa
def new_track(points):
"""A new track from points.
Returns: GPXTrack
"""
segment = GPXTrackSegment()
track = GPXTrack()
track.segments.append(segment)
segment.points = points
return track
tracks = list()
points = self.point_list()
all_waypoints = self.__gpx.waypoints
full_segment = GPXTrackSegment()
full_segment.points = points
nearest_indices = list(reversed(sorted(full_segment.get_nearest_location(x)[1] for x in all_waypoints)))
logging.error('nearest:%s', nearest_indices)
for idx in nearest_indices:
tracks.append(new_track(points[idx + 1:]))
points = points[:idx + 1]
if points:
tracks.append(new_track(points))
self.__gpx.tracks = list(reversed(tracks))
for _ in tracks:
logging.debug('Track with %d points', len(_.segments[0].points))
self.rewrite() # TODO: only if changed
[docs] def split_points(self, max_points):
"""Split into separate segments every # points.""" # noqa
def new_segment(points):
"""A new track from points.
Returns: GPXTrackSegment
"""
segment = GPXTrackSegment()
segment.points = points
return segment
changed = False
for track in self.gpx.tracks:
logging.debug('splitting track with %s segments every %s points', len(track.segments), max_points)
segments = list()
for segment in track.segments:
logging.debug('segment points: %s', len(segment.points))
if len(segment.points) > max_points:
changed = True
all_points = segment.points[:]
logging.debug('all_points: %s', len(all_points))
while len(all_points) > 0:
logging.error('new segment with %d points', max_points)
segments.append(new_segment(all_points[:max_points]))
all_points = all_points[max_points:]
track.segments = segments
if changed:
self.rewrite()
[docs] def locate_point(self, track=0, segment=0, point=0) ->str:
"""Determine name of place for point.
Saves that in point.name for caching. If backend.account.country is given,
add the country name only if it is different.
If no such point exists, just do nothing and return "nowhere"
Args:
track, segment, point: Indices into the respective arrays
Returns: A string
"""
try:
result, located = self.gpx.locate_point(track, segment, point)
except IndexError:
return 'nowhere'
if located:
if self.backend:
self.rewrite()
return result
[docs] def add_locations(self, segments=False):
"""Call locate_point for the first point.
Args: segments: Also do that for the first point of each segment.
"""
with self.batch_changes():
self.locate_point()
if segments:
for track_idx, gpx_track in enumerate(self.gpx.tracks):
for seg_idx, _ in enumerate(gpx_track.segments):
self.locate_point(track=track_idx, segment=seg_idx)
[docs] def add_segment_waypoints(self):
"""Every segment start gets a waypoint.
The name looks like :literal:`Trk/Seg 2/4 Mainz-Wiesbaden`.
Existing such waypoints are removed if the no longer belong
to a segment start.
For the involved points (first and last of each segment) see
:meth:`locate_point`.
"""
if self.gpx.add_segment_waypoints():
self._dirty = 'gpx'
[docs] def join_tracks(self, force=False):
"""Join all tracks to a single track.
Metadata from all but the first track is thrown away.
If metadata will be lost, it is printed and nothing is done unless force is True
Args: force if True, join even if metadata is lost
Returns: list()
A list with text strings about lost metadata
"""
result = self.gpx.join_tracks(force)
if result:
result = [' ' + x for x in result]
if force:
msg = 'Joining tracks in {trk} lost metadata from joined tracks:'
else:
msg = 'Joining tracks in {trk} would lose metadata from joined tracks, use the force option:'
result.insert(0, msg.format(trk=self))
if not result or force:
self.rewrite()
return result
[docs] def untangle(self, force=False):
"""Locate stops and clean away its local erratic movements.
Returns:A list of strings describing what happens/would happen
"""
result = self.gpx.untangle(force)
if result and force:
self.rewrite()
result = [str(self) + ':' + x for x in result]
return result
[docs] def refence(self, force=False):
"""Re-apply fences. They might now be more restrictive.
Returns:A list of strings describing what happens/would happen
"""
self._load_full()
result = []
if self._illegal_points:
result.append('Removing {} points within fences'.format(self._illegal_points))
if force:
with self.fenced():
self.rewrite()
return result