#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2019 Wolfgang Rohdewald <wolfgang@rohdewald.de>
# See LICENSE for details.
"""This module defines :class:`~gpxity.backend.Backend`."""
# pylint: disable=protected-access
import datetime
from inspect import getmembers, isfunction
from contextlib import contextmanager
import logging
from copy import deepcopy
from .accounts import Account
from .gpxfile import GpxFile
from .util import collect_gpxfiles
from .gpx import Gpx
from .backend_base import BackendBase
__all__ = ['Backend']
[docs]class Backend(BackendBase):
"""A place where gpxfiles live. Something like the filesystem or http://mapmytracks.com.
A Backend should hold only gpxfiles for one person, and they
should not overlap in time. This is not enforced but sometimes
behaviour is undefined if you ignore this.
A Backend be used as a context manager.
A Backend allows indexing by normal int index, by :class:`GpxFile <gpxity.gpxfile.GpxFile>`
and by :attr:`GpxFile.id_in_backend <gpxity.gpxfile.GpxFile.id_in_backend>`.
:literal:`if 'ident' in backend` is possible.
len(backend) shows the number of gpxfiles. Please note that Code
like :literal:`if backend:` may not behave as expected. This will be False if the backend
has no gpxfile. If that is not what you want, consider :literal:`if backend is not None`
The backend will automatically synchronize. So something like :literal:`len(Backend())` will work.
However, some other Backend pointing to the same storage or even a different process
might change things. If you want to cope with that, use :meth:`scan`.
Not all backends support all methods. The unsupported methods
will raise NotImplementedError. As a convenience every backend
has a list **supported** to be used like :literal:`if 'gpxfile' in backend.supported:`
where `gpxfile` is the name of the method.
Backends support no locking. If others modify a backend concurrently, you may
get surprises. It is up to you to handle those.
Some backends may use cookies.
Args:
account (:class:`~gpxity.accounts.Account`): The account to be used.
Alternatively a dict can be passed to build an ad hoc :class:`~gpxity.accounts.Account`
instance.
Attributes:
supported (set(str)): The names of supported methods. Creating the first instance of
the backend initializes this. Only methods which may not be supported are mentioned here.
If a particular value write_* like write_public does not exist, the entire gpxfile is written instead
which normally results in a new ident for the gpxfile.
Some special values are:
* rename: allows assigning values to id_in_backend
full_support (set(str)): All possible values for the supported attribute.
url (str): the address. May be a real URL or a directory, depending on the backend implementation.
Every implementation may define its own default for url. Must never end with '/' except for
Directory(url='/').
test_is_expensive: For internal use. If True, the self tests will reduce test cases and try to
avoid too much usage of the backend.
max_field_sizes: Some backends have a limited size for some attributes like keywords. This
is only an approximative guess. The backend will not protect you from overriding it
but the unittests will try to stay within those limits.
point_precision: The precision supported by this backend. We are never more precise than 6.
That is the digits after the decimal separator.
supported_categories: The categories supported by this backend. The first one is used as default.
accepts_zero_points: True if the Backend accepts a GpxFile without Points
"""
# pylint: disable=too-many-instance-attributes
[docs] class NoMatch(Exception):
"""Is raised if a gpxfile is expected to pass the match filter but does not"""
supported = set()
default_url = None # Override in the backends
test_is_expensive = True
max_field_sizes = {}
_category_decoding = dict()
_category_encoding = dict()
full_support = (
'scan', 'remove', 'write', 'write_title', 'write_public',
'own_categories', 'rename',
'write_category', 'write_description', 'keywords', 'write_add_keywords', 'write_remove_keywords')
_max_length = dict()
point_precision = 5
_timeout = None
__all_backends = dict()
accepts_zero_points = False
# It is important that we have only one global session
# because gpsies.com seems to have several servers and their
# synchronization is sometimes slower than expected. See
# cookie "SERVERID".
_session = dict()
def __init__(self, account):
"""See class docstring."""
if self.is_disabled():
raise Backend.BackendException('class {} is disabled'.format(self.__class__.__name__))
if not isinstance(account, Account):
raise Exception('Backend() wants an Account')
account.config = deepcopy(account.config)
self.account = account
if 'url' not in self.account.config:
self.account.config['url'] = self.default_url
if 'backend' not in self.account.config:
self.account.config['backend'] = self.__class__.__name__
self._decoupled = False
self.__gpxfiles = list()
self._gpxfiles_fully_listed = False
self.__match = None
self.logger = logging.getLogger(str(self))
# do not want to see "Resetting dropped connection"
logging.getLogger("requests.packages.urllib3.connectionpool").setLevel(logging.WARNING)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
self._cached_subscription = None # to be used by specific Backend classes
@property
def timeout(self):
"""Timeout from account or class default.
Returns: The timeout
"""
return self.account.timeout or self._timeout
@property
def url(self):
"""get self.account.url.
This also makes sure Backend.url is not writable.
Returns: The url
"""
if self.account.url is None:
self.account.config['url'] = self.default_url
return self.account.url
def _has_default_url(self) ->bool:
"""Check if the backend has the default url.
Returns:
True if so
"""
if self.default_url is None:
return False
return self.url == self.default_url
def _get_author(self) ->str:
"""Get the username for the account.
Raise BackendException if no username is given.
Returns:
The username
"""
author = self.account.username
if not author:
raise Backend.BackendException('{} needs a username'.format(self.url))
return author
def __str__(self) ->str:
"""A unique identifier for every physical backend.
Two Backend() instances pointing to the same physical backend have the same identifier.
Returns:
A unique identifier
"""
return self.account.name
supported_categories = GpxFile.categories
@contextmanager
def _decouple(self):
"""Context manager: disable automic synchronization for the backend.
In that state, automatic writes of changes into
the backend are disabled, and if you access attributes which
would normally trigger a full load from the backend, they will not.
Use this to avoid recursions.
You should never need this unless you implement a new backend.
"""
prev_decoupled = self._decoupled
self._decoupled = True
try:
yield
finally:
self._decoupled = prev_decoupled
@property
def match(self):
"""Filter gpxfiles.
A function with one argument returning None or str. The backend will call
this with every gpxfile and ignore gpxfiles where match does not return None.
The returned str should explain why the gpxfile does not match.
If you change a gpxfile such that it does not match anymore, the exception
NoMatch will be raised and the match stays unchanged.
"""
return self.__match
@match.setter
def match(self, value):
"""see match.getter."""
old_match = self.__match
self.__match = value
try:
self.scan()
except self.NoMatch:
self.__match = old_match
raise
@classmethod
def _define_support(cls):
"""If the first thing a method does is raising NotImplementedError, it is marked as unsupported."""
support_mappings = {
# map internal names to more user friendly ones. See doc for
# Backend.supported.
'_change_ident': 'rename',
'_list': 'scan',
'_remove_ident': 'remove',
'_write_all': 'write'}
cls.supported = set()
cls.supported.add('keywords') # default
if cls.supported_categories != GpxFile.categories:
cls.supported.add('own_categories')
for name, method in getmembers(cls, isfunction):
if name in support_mappings:
if cls._is_implemented(method):
cls.supported.add(support_mappings[name])
elif name.startswith('_write_') and name != '_write_attribute':
if cls._is_implemented(method):
cls.supported.add(name[1:])
[docs] @classmethod
def decode_category(cls, value: str) ->str:
"""Translate the value from the backend into one out of GpxFile.categories.
Returns:
The decoded name
"""
if value in GpxFile.categories:
return value
if value in GpxFile._obsolete_categories:
return GpxFile._obsolete_categories[value]
if value.capitalize() in GpxFile.categories:
return value.capitalize()
if value not in cls._category_decoding:
raise cls.BackendException('{} gave us an unknown gpxfile type "{}"'.format(cls.__name__, value))
return cls._category_decoding[value]
[docs] @classmethod
def encode_category(cls, value: str) ->str:
"""Translate internal value (out of GpxFile.categories) into the backend specific value.
Returns:
The encoded name
"""
if value in GpxFile._obsolete_categories:
value = GpxFile._obsolete_categories[value]
if value in cls.supported_categories:
return value
if value.lower() in cls.supported_categories:
return value.lower()
if value in cls._category_encoding:
return cls._category_encoding[value]
for key, target in cls._category_decoding.items():
if value.lower() == target.lower():
return key
if value == Gpx.undefined_str:
return cls.encode_category(GpxFile.categories[0])
raise cls.BackendException('{} has no equivalent for "{}"'.format(cls.__name__, value))
@staticmethod
def _encode_keyword(value: str) ->str:
"""Replicate the translation the backend does. MMT for example capitalizes all words.
Returns:
the encoded keyword
"""
return value
@staticmethod
def _encode_description(gpxfile) ->str:
"""A backend might put keywords into the description. WPTrackserver does.
Returns: The string to be saved in the backend
"""
return gpxfile.description
@classmethod
def _decode_description(cls, gpx, value):
"""A backend might put keywords into the description. WPTrackserver does.
Returns: The description
"""
gpx.description = value
return value
[docs] def get_time(self) ->datetime.datetime: # pylint: disable=no-self-use
"""get time from the server where backend is located as a Linux timestamp.
A backend implementation does not have to support this.
Returns: datetime.datetime
"""
return datetime.datetime.now()
def _change_ident(self, gpxfile, new_ident: str):
"""Change the id in the backend.
If new_ident already exists, the backend is free to
change it to a unique name or to raise an Exception.
"""
raise NotImplementedError
[docs] def scan(self, now: bool = False) ->None:
"""Enforce a reload of the list of all gpxfiles in the backend.
This will be delayed until the list is actually needed again.
If this finds an unsaved gpxfile not matching the current match
function, an exception is thrown.
Saved Tracks not matching the current match will no be loaded.
Args:
now: If True, do not delay scanning.
"""
self._gpxfiles_fully_listed = False
if now:
self._scan()
@property
def is_scanned(self) ->bool:
"""Check if the backend has already been scanned for gpxfiles.
Returns: (bool)
The answer
"""
return self._gpxfiles_fully_listed
def _scan(self) ->None:
"""load the list of all gpxfiles in the backend if not yet done.
Enforce this by calling :meth:`scan` first.
"""
if not self._gpxfiles_fully_listed and not self._decoupled:
self._gpxfiles_fully_listed = True
unsaved = [x for x in self.__gpxfiles if not x.id_in_backend]
if self.__match is not None:
for gpxfile in unsaved:
# side effect: raises exception if no match
self.matches(gpxfile, 'scan')
self.__gpxfiles = unsaved
if 'scan' in self.supported:
match_function = self.__match
self.__match = None
try:
# _list loads ALL gpxfiles, match will be
# applied in a second loop. This way the Backend implementations
# do not have to worry about the match code.
with self._decouple():
self._list()
finally:
self.__match = match_function
if self.__match is not None:
self.__gpxfiles = [x for x in self.__gpxfiles if self.matches(x)]
def _found_gpxfile(self, ident: str, gpx):
"""Create an empty gpxfile for ident and insert it into this backend.
Returns:
the new gpxfile
"""
result = GpxFile(gpx)
with self._decouple():
result._set_backend(self)
result.id_in_backend = ident
self._append(result)
return result
def _list(self):
"""Load all gpxfile headers and append them to the backend.
The gpxfiles will not be loaded if possible.
"""
raise NotImplementedError()
def _read_all_decoupled(self, gpxfile) ->None:
"""Decouple and call the backend specific _read."""
with self._decouple():
self._read(gpxfile)
gpxfile.gpx.default_country = self.account.country
points_read = gpxfile.gpx.get_track_points_no()
with gpxfile.fenced():
fenced_points = gpxfile.gpx.get_track_points_no()
gpxfile._illegal_points = points_read - fenced_points
def _read(self, gpxfile) ->None:
"""fill the gpxfile with all its data from source."""
raise NotImplementedError()
[docs] def matches(self, gpxfile, exc_prefix: str = None):
"""match gpxfile against the current match function.
Args:
exc_prefix: If not None, use it for the beginning of an exception message.
If None, never raise an exception
Returns:
True for match
"""
if self.__match is None:
return True
match_error = self.__match(gpxfile)
if match_error and exc_prefix:
raise Backend.NoMatch('{}: {} does not match: {}'.format(exc_prefix, gpxfile, match_error))
return match_error is None
def _needs_full_save(self, changes) ->bool:
"""Do we have to rewrite the entire gpxfile?.
Returns:
True if we must save fully
"""
for change in changes:
if change == 'all':
return True
write_name = 'write_{}'.format(change.split(':')[0])
if write_name not in self.supported:
return True
return False
[docs] def add(self, gpxfile):
"""
Add a gpxfile to this backend.
No gpxfile already existing in this backend will be overwritten.
If :attr:`GpxFile.id_in_backend <gpxity.gpxfile.GpxFile.id_in_backend>`
is not given, the backend will create a unique value. If it is given,
the backend will try to use it or create a new value at its discretion.
Some Backends will not accept a gpxfile without Points. Only Directory
is granted to handle a gpxfile wihout points.
Note that some backends may reject a gpxfile if it is very
similar to an existing gpxfile even if it belongs to some other user.
If the gpxfile object is already in the list of gpxfiles, raise ValueError.
If the gpxfile does not pass the current match function, raise an exception.
Args:
gpxfile (~gpxity.gpxfile.GpxFile): The gpxfile we want to save in this backend.
Returns:
~gpxity.gpxfile.GpxFile: The saved gpxfile. If the original gpxfile lives in a different
backend, a new gpxfile living in this backend will be created
and returned.
"""
if self._decoupled:
raise Exception('A backend cannot save() while being decoupled. This is probably a bug in gpxity.')
self.matches(gpxfile, 'add')
if gpxfile.backend is not self and gpxfile.backend:
# we do not want clone() loading the gpxfile because
# that cannot be done with fences applied
gpxfile._illegal_points = 0 # when copying a GpxFile, it is OK to lose points
had_ids = gpxfile.ids
had_ids.append(str(gpxfile))
gpxfile._load_full()
with gpxfile.fenced():
new_gpxfile = gpxfile.clone()
new_gpxfile.ids = had_ids
try:
self._check_id_legal(gpxfile.id_in_backend)
new_gpxfile.id_in_backend = gpxfile.id_in_backend
except ValueError:
pass
else:
if any(x is gpxfile for x in self.__gpxfiles):
raise ValueError(
'Already in list: GpxFile {} with id={}, have={}'.format(
gpxfile, id(gpxfile), ','.join(str(x) for x in self)))
new_gpxfile = gpxfile
try:
with self._decouple():
new_gpxfile._set_backend(self)
self.__check_empty(gpxfile)
self._write_all(new_gpxfile)
self._append(new_gpxfile)
gpxfile._clear_dirty()
return new_gpxfile
except Exception:
# do not do self.remove. If we try to upload the same gpxfile to gpsies,
# gpsies will assign the same trackid, and we come here. __gpxfiles will
# only hold the first uploaded gpxfile, and remove would remove that
# instance instead of this one.
# TODO: do we have a unittest for that case?
self.__gpxfiles = [x for x in self.__gpxfiles if x is not new_gpxfile]
with self._decouple():
new_gpxfile.id_in_backend = None
new_gpxfile._set_backend(None)
raise
def __check_empty(self, gpxfile):
"""Check if the track is empty but the backend needs points.
May raise an exception.
"""
if not self.accepts_zero_points and gpxfile.gpx.get_track_points_no() == 0:
raise self.BackendException(
'{} does not accept GpxFile without points: {}'.format(
self.__class__.__name__, gpxfile))
def _rewrite(self, gpxfile, changes):
"""Rewrite the full gpxfile.
Used only by GpxFile when things change.
If this changes track.id_in_backend, the GpxFile with the old id_backend is removed.
"""
assert gpxfile.backend is self
assert self._has_item(gpxfile.id_in_backend), '{}: its id_in_backend {} is not in {}'.format(
gpxfile, gpxfile.id_in_backend, ' / '.join(str(x) for x in self))
assert gpxfile._dirty
needs_full_save = self._needs_full_save(changes)
self.matches(gpxfile, '_rewrite')
if needs_full_save:
with gpxfile.fenced():
self.__check_empty(gpxfile)
old_id = gpxfile.id_in_backend
new_id = self._write_all(gpxfile)
if old_id and old_id != new_id:
self._remove_ident(old_id)
gpxfile.id_in_backend = new_id
else:
for change in changes:
_ = change.split(self._dirty_separator)
write_name = '_write_{}'.format(_[0])
if len(_) == 1:
getattr(self, write_name)(gpxfile)
elif len(_) == 2:
getattr(self, write_name)(gpxfile, _[1])
else:
raise Exception('dirty {} got too many arguments:{}'.format(write_name, _[1:]))
def _write_all(self, gpxfile) ->str:
"""the actual implementation for the concrete Backend.
Writes the entire GpxFile.
Returns:
The new id_in_backend
"""
raise NotImplementedError()
[docs] def remove(self, value) ->None:
"""
Remove gpxfile. This can also be done for gpxfiles not passing the current match function.
Args:
value: If it is not an :class:`~gpxity.gpxfile.GpxFile`, :meth:`remove` looks
it up by doing :literal:`self[value]`
"""
gpxfile = value if hasattr(value, 'id_in_backend') else self[value]
if gpxfile.id_in_backend:
self._remove_ident(gpxfile.id_in_backend)
with self._decouple():
gpxfile.gpx.is_complete = True # we do not care about partially loaded GpxFile when deleting it
gpxfile._set_backend(None)
try:
self.__gpxfiles = [x for x in self.__gpxfiles if x.id_in_backend != gpxfile.id_in_backend]
except ValueError:
pass
def _remove_ident(self, ident: str) ->None:
"""backend dependent implementation."""
raise NotImplementedError()
def _lifetrack_start(self, gpxfile, points) -> str: # pylint: disable=unused-argument
"""Modelled after MapMyTracks. I hope this matches other services too.
This will always produce a new gpxfile in the backend.
Default is to just add the points to the gpxfile.
Args:
gpxfile(GpxFile): Holds initial data and points already added.
keep in mind that this process might restart which should
be invisible to the target.
points: Initial points
Returns: The new id_in_backend
For details see :meth:`GpxFile.gpxfile() <gpxity.lifetrack.Lifetrack.start>`.
"""
if gpxfile.id_in_backend not in self:
gpxfile.id_in_backend = self.add(gpxfile.clone()).id_in_backend
return gpxfile.id_in_backend
def _lifetrack_update(self, gpxfile, points):
"""If the backend does not support lifetrack, just add the points to the gpxfile.
Args:
gpxfile(GpxFile): Holds initial data
points: If None, stop tracking. Otherwise, start tracking
and add points.
For details see :meth:`GpxFile.gpxfile() <gpxity.lifetrack.Lifetrack.update>`.
"""
self[gpxfile.id_in_backend].add_points(points)
def _lifetrack_end(self, gpxfile):
"""Default: Nothing needs to be done."""
[docs] def remove_all(self):
"""Remove all gpxfiles we know about.
If their :attr:`id_in_backend` has meanwhile been changed
through another backend instance or another process, we
cannot find it anymore. We do **not** rescan all gpxfiles in the backend.
If you want to make sure it will be empty, call :meth:`scan` first.
If you use a match function, only matching gpxfiles will be removed."""
for gpxfile in list(self):
if self.matches(gpxfile):
self.remove(gpxfile)
[docs] def detach(self):
"""Should be called when access to the Backend is not needed anymore."""
def __contains__(self, value) ->bool:
"""value is either an a gpxfile or a gpxfile id.
Does NOT load gpxfiles, only checks what is already known.
Returns:
True if we have the item
"""
self._scan()
return self._has_item(value)
def _has_item(self, index) ->bool:
"""like __contains__ but for internal use: does not call _scan first.
Must not call self._scan.
Returns:
True if we have the item
"""
if hasattr(index, 'id_in_backend') and index in self.__gpxfiles:
return True
if isinstance(index, str) and index in [x.id_in_backend for x in self.__gpxfiles]:
return True
return False
def __getitem__(self, index):
"""Allow accesses like alist[a_id].
Do not call this when implementing a backend because this always calls scan() first.
Instead use :meth:`_has_item`.
Returns:
the gpxfile
"""
self._scan()
if isinstance(index, int):
return self.__gpxfiles[index]
for _ in self.__gpxfiles:
if _ is index or _.id_in_backend == index:
return _
raise IndexError
def __len__(self) ->int:
"""do not call this when implementing a backend because this calls scan().
Returns:
the length
"""
self._scan()
return len(self.__gpxfiles)
[docs] def real_len(self) ->int:
"""len(backend) without calling scan() first.
Returns:
the length"""
return len(self.__gpxfiles)
def _append(self, gpxfile):
"""Append a gpxfile to the cached list."""
if gpxfile.id_in_backend is not None and not isinstance(gpxfile.id_in_backend, str):
raise Exception('{}: id_in_backend must be str'.format(gpxfile))
tracks_with_this_id = [x for x in self.__gpxfiles if x.id_in_backend == gpxfile.id_in_backend]
if tracks_with_this_id:
assert len(tracks_with_this_id) == 1
track_with_this_id = tracks_with_this_id[0]
if not track_with_this_id.backend:
# we actually replace the unsaved gpxfile with the new one
del self.__gpxfiles[track_with_this_id]
if gpxfile.id_in_backend is not None and any(
x.id_in_backend == gpxfile.id_in_backend for x in self.__gpxfiles):
# cannot do "in self" because we are not decoupled, so that would call _scan()
raise ValueError(
'Backend.append(gpxfile {}): its id_in_backend {} is already in list: GpxFile={}, list={}'.format(
str(gpxfile), gpxfile.id_in_backend, self[gpxfile.id_in_backend], self.__gpxfiles))
self.matches(gpxfile, 'append')
self.__gpxfiles.append(gpxfile)
def __repr__(self):
"""do not call len(self) because that does things.
Returns:
The repr str
"""
return '{}({} in {})'.format(self.__class__.__name__, len(self.__gpxfiles), self.account)
def __enter__(self):
"""See class docstring.
Returns:
self
"""
return self
def __exit__(self, exc_type, exc_value, trback):
"""See class docstring."""
self.detach()
def __iter__(self):
"""See class docstring.
Returns:
iterator over gpxfiles
"""
self._scan()
return iter(self.__gpxfiles)
def __bool__(self):
"""Return True always.
A programmer (myself included) may be tempted
to say :literal:`if gpxfile.backend:` for checking if the
GpxFile has a backend assigned. But without __bool__
that would do len(backend) wich scans the - possibly remote - backend.Backend
Returns: True
"""
return True
def __eq__(self, other) ->bool: # TODO: use str
"""True if both backends have the same gpxfiles.
Returns:
True if both backends have the same gpxfiles
"""
return {x.key() for x in self} == {x.key() for x in other}
def __copy(self, other_gpxfiles, remove, dry_run):
"""Copy other_gpxfiles into self. Used only by self.merge().
Returns:
verbose messages
"""
result = list()
for old_gpxfile in other_gpxfiles:
if not dry_run:
new_gpxfile = self.add(old_gpxfile)
result.append('{} {} -> {}'.format(
'blind move' if remove else 'blind copy', old_gpxfile, self.account if dry_run else new_gpxfile))
if remove and not dry_run:
old_gpxfile.remove()
return result
def __find_mergable_groups(self, gpxfiles, partial: bool = False):
"""Find mergable groups.
Returns:
A list of gpxfiles. The first one is the sink for the others
"""
result = list()
rest = list(self)
rest.extend(x for x in gpxfiles if str(x.backend) != str(self))
while rest:
root = rest[0]
group = list([root])
group.extend(x for x in rest[1:] if root.can_merge(x, partial)[0] is not None) # noqa
# merge target should be the longest gpxfile in self:
group.sort(key=lambda x: (x.backend is self, x.gpx.get_track_points_no()), reverse=True)
result.append(group)
for _ in group:
rest = [x for x in rest if x is not _]
return result
[docs] def merge(self, other, remove: bool = False, dry_run: bool = False, copy: bool = False,
partial: bool = False) ->list: # noqa
"""merge other backend or a single gpxfile into this one. Tracks within self are also merged.
If two gpxfiles have identical points, or-ify their other attributes.
Args:
other: The backend or a single gpxfile to be merged
remove: If True, remove merged gpxfiles
dry_run: If True, do not really merge or remove
copy: Do not try to find a matching gpxfile, just copy other into this Backend
partial: If True, two gpxfiles are mergeable if one of them contains the other one.
Returns:
list(str) A list of messages for verbose output
"""
# pylint: disable=too-many-branches,too-many-locals
# TODO: test for dry_run
# TODO: test for merging single track
# TODO: test for merging a backend or a gpxfile with itself. Where
# they may be identical instantiations or not. For all backends.
result = list()
other_gpxfiles = collect_gpxfiles(other)
if copy:
return self.__copy(other_gpxfiles, remove, dry_run)
null_datetime = datetime.datetime(year=1, month=1, day=1)
groups = self.__find_mergable_groups(other, partial)
merge_groups = [x for x in groups if len(x) > 1]
merge_groups.sort(key=lambda x: x[0].first_time or null_datetime)
if merge_groups:
result.append('{} mergable groups:'.format(len(merge_groups)))
for _ in merge_groups:
result.append(' {} ----> {}'.format(', '.join(str(x) for x in _[1:]), _[0])) # noqa
for destination, *sources in groups:
if destination.backend is not self:
new_destination = destination if dry_run else self.add(destination)
result.append('{} {} -> {}'.format(
'move' if remove else 'copy', destination, self if dry_run else new_destination))
if remove and not dry_run:
destination.remove()
destination = new_destination
for source in sources:
result.extend(destination.merge(
source, remove=remove, dry_run=dry_run, partial=partial))
return result
@staticmethod
def _html_encode(value) ->str:
"""encode str to something gpies.com accepts.
Returns:
the encoded value
"""
if value is None:
return ''
return value.encode('ascii', 'xmlcharrefreplace').decode()
[docs] def flush(self):
"""Some backends delay actual writing. This enforces writing.
Currently, only the Mailer backend can delay, it will bundle all
mailed gpxfiles into one mail instead of sending separate mails
for every gpxfile. Needed for lifetracking.
"""
[docs] def clone(self):
"""return a clone."""
return self.__class__(self.account)
def _get_current_keywords(self, gpxfile): # pylint:disable=no-self-use
"""A backend might be able to return the currently stored keywords.
This is useful for unittests: Compare the internal state with what the
backend actually says.
"""
return gpxfile.keywords
[docs] @classmethod
def instantiate(cls, name: str):
"""Instantiate a Backend or a GpxFile out of its identifier.
Calls instantiate_backend and optionally changes result to GpxFile.
Args:
name: The string identifier to be parsed
Returns:
A GpxFile or a Backend. If the Backend has already been instantiated, return the cached value.
If the wanted object does not exist, exception FileNotFoundError is raised.
"""
result, track_id = cls.instantiate_backend(name)
if track_id:
try:
result = result[track_id]
except IndexError:
raise FileNotFoundError('{} not found'.format(GpxFile.identifier(result, track_id)))
return result
[docs] @classmethod
def instantiate_backend(cls, name: str):
"""Instantiate a Backend.
The full notation of an id_in_backend in a specific backend is
similiar to what scp expects:
Account:id_in_backend where Account is a reference to the accounts file.
Locally reachable files or directories may be written without the leading
Directory:. And a leading ~ is translated into the user home directory.
The trailing .gpx can be omitted. It will be removed anyway for id_in_backend.
If the file path of a local gpxfile (Directory) contains a ":", the file path
must be absolute or relative (start with "/" or with "."), or the full notation
with the leading Directory: is needed
Args:
name: The string identifier to be parsed
Returns: tuple()
* The first element is the Backend. If the Backend has already been instantiated, return the cached value.
If the wanted object does not exist, exception FileNotFoundError is raised.
* The second element is a track_id or None
"""
account, track_id = cls.parse_objectname(name)
cache_key = str(account)
if cache_key in cls.__all_backends:
result = cls.__all_backends[cache_key]
else:
result = cls.find_class(account.backend)(account)
cls.__all_backends[cache_key] = result
return result, track_id
@property
def subscription(self) ->str:
"""Get the subscription model. Like free, paid, plus, whatever.
If the backend has no subscription model, return None.
The unpaid subscription is granted to always return :literal:`free`.
The most expensive subscription is granted to always return :literal:`full`.
Intermediate values may vary.
Because I (the developer) have no paid account, I can test this only
partially. Feedback is welcome!
Returns: The name of the subscription or None
"""
return None