#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2019 Wolfgang Rohdewald <wolfgang@rohdewald.de>
# See LICENSE for details.
"""This module defines :class:`~gpxity.backend.Backend`."""
# pylint: disable=protected-access
import os
import datetime
from inspect import getmembers, isfunction, isclass, getmro
import dis
from contextlib import contextmanager
import logging
import importlib
from .auth import Authenticate
from .track import Track, Fences
from .util import collect_tracks
__all__ = ['Backend']
[docs]class Backend:
"""A place where tracks live. Something like the filesystem or http://mapmytracks.com.
A Backend should hold only tracks for one person, and they
should not overlap in time. This is not enforced but sometimes
behaviour is undefined if you ignore this.
A Backend be used as a context manager. Upon termination, all tracks
may be removed automatically by setting cleanup=True. Some concrete
implementations may also remove the backend itself.
A Backend allows indexing by normal int index, by :class:`Track <gpxity.track.Track>`
and by :attr:`Track.id_in_backend <gpxity.track.Track.id_in_backend>`.
:literal:`if 'ident' in backend` is possible.
len(backend) shows the number of tracks. Please note that Code
like :literal:`if backend:` may not behave as expected. This will be False if the backend
has no track. If that is not what you want, consider :literal:`if backend is not None`
The backend will automatically synchronize. So something like :literal:`len(Backend())` will work.
However, some other Backend pointing to the same storage or even a different process
might change things. If you want to cope with that, use :meth:`scan`.
Not all backends support all methods. The unsupported methods
will raise NotImplementedError. As a convenience every backend
has a list **supported** to be used like :literal:`if 'track' in backend.supported:`
where `track` is the name of the method.
Backends support no locking. If others modify a backend concurrently, you may
get surprises. It is up to you to handle those.
Some backends may use cookies.
Args:
url (str): Initial value for :attr:`url`
auth (str): The username.
This will lookup the password and config from :class:`Authenticate <gpxity.auth.Authenticate>`.
You can also pass a dict containing what would normally be obtained from
:class:`Authenticate <gpxity.auth.Authenticate>`. The dict must also contain 'Username'.
cleanup (bool): If true, :meth:`destroy` will remove all tracks.
Attributes:
supported (set(str)): The names of supported methods. Creating the first instance of
the backend initializes this. Only methods which may not be supported are mentioned here.
If a particular value write_* like write_public does not exist, the entire track is written instead
which normally results in a new ident for the track.
full_support (set(str)): All possible values for the supported attribute.
url (str): the address. May be a real URL or a directory, depending on the backend implementation.
Every implementation may define its own default for url. Must never end with '/' except for
Directory(url='/').
timeout: If None, there are no timeouts: Gpxity waits forever. For legal values
see http://docs.python-requests.org/en/master/user/advanced/#timeouts
fences: The fences as found in config. You can programmatically change them but they will
never be applied to already existing data.
needs_config: If True, the Backend class expects data in auth.cfg
config: A Section with all entries in auth.cfg for this backend
config.fences: The backend will never write points within fences.
You can define any number of fences separated by spaces. Every fence is a circle.
It has the form Lat/Long/meter.
Lat and Long are the center position in decimal degrees, meter is the radius.
test_is_expensive: For internal use. If True, the self tests will reduce test cases and try to
avoid too much usage of the backend.
max_field_sizes: Some backends have a limited size for some attributes like keywords. This
is only an approximative guess. The backend will not protect you from overriding it
but the unittests will try to stay within those limits.
point_precision: The precision supported by this backend. We are never more precise than 6.
That is the digits after the decimal separator.
supported_categories: The categories supported by this backend. The first one is used as default.
"""
# pylint: disable=too-many-instance-attributes
[docs] class NoMatch(Exception):
"""Is raised if a track is expected to pass the match filter but does not"""
[docs] class BackendException(Exception):
"""Is raised for general backend exceptions, especially error messages from a remote server"""
supported = set()
default_url = None # Override in the backends
needs_config = True
test_is_expensive = True
max_field_sizes = {}
_category_decoding = dict()
_category_encoding = dict()
full_support = (
'scan', 'remove', 'write', 'write_title', 'write_public',
'own_categories',
'write_category', 'write_description', 'keywords', 'write_add_keywords', 'write_remove_keywords')
_max_length = dict()
_default_public = False
point_precision = 5
# It is important that we have only one global session
# because gpsies.com seems to have several servers and their
# synchronization is sometimes slower than expected. See
# cookie "SERVERID".
_session = dict()
__all_backend_classes = None
__all_backends = dict()
def __init__(self, url: str = None, auth=None, cleanup: bool = False, timeout=None):
"""See class docstring."""
logging.debug('Backend(%s: url=%s, auth=%s)', self.__class__.__name__, url, auth)
if self.is_disabled():
raise Backend.BackendException('class {} is disabled'.format(self.__class__.__name__))
self._decoupled = False
self.__tracks = list()
self._tracks_fully_listed = False
if isinstance(url, Authenticate):
self.config = url
else:
self.config = Authenticate(self, url, auth)
self._cleanup = cleanup
self.__match = None
self.logger = logging.getLogger(str(self))
self.timeout = timeout
self.fences = Fences(self.config.fences)
@property
def url(self):
"""get self.config['url'].
Returns: The url
"""
return self.config.url
def _has_default_url(self) ->bool:
"""Check if the backend has the default url.
Returns:
True if so
"""
if self.default_url is None:
return False
return self.url == self.default_url
def _get_author(self) ->str:
"""Get the username for the account.
Raise BackendException if no username is given.
Returns:
The username
"""
author = self.config.section.get('Username')
if not author:
raise Backend.BackendException('{} needs a username'.format(self.url))
return author
def __str__(self) ->str:
"""A unique identifier for every physical backend.
Two Backend() instances pointing to the same physical backend have the same identifier.
Returns:
A unique identifier
"""
url = ''
if not self._has_default_url():
url = self.url + '/'
result = '{}:{}{}'.format(
self.__class__.__name__.lower(),
url,
self.config.username)
return result
supported_categories = Track.categories
@contextmanager
def _decouple(self):
"""Context manager: disable automic synchronization for the backend.
In that state, automatic writes of changes into
the backend are disabled, and if you access attributes which
would normally trigger a full load from the backend, they will not.
Use this to avoid recursions.
You should never need this unless you implement a new backend.
"""
prev_decoupled = self._decoupled
self._decoupled = True
try:
yield
finally:
self._decoupled = prev_decoupled
@classmethod
def _is_implemented(cls, method) ->bool:
"""False if the first instruction in method raises NotImplementedError or if the method does nothing.
Returns:
True if method is implemented
"""
first_instruction = next(dis.get_instructions(method.__code__))
return first_instruction is not None and first_instruction.argval != 'NotImplementedError'
@classmethod
def _define_support(cls):
"""If the first thing a method does is raising NotImplementedError, it is marked as unsupported."""
support_mappings = {
# map internal names to more user friendly ones. See doc for
# Backend.supported.
'_load_track_headers': 'scan',
'_remove_ident': 'remove',
'_write_all': 'write'}
cls.supported = set()
cls.supported.add('keywords') # default
if cls.supported_categories != Track.categories:
cls.supported.add('own_categories')
for name, method in getmembers(cls, isfunction):
if name in support_mappings:
if cls._is_implemented(method):
cls.supported.add(support_mappings[name])
elif name.startswith('_write_') and name != '_write_attribute':
if cls._is_implemented(method):
cls.supported.add(name[1:])
@property
def match(self):
"""Filter tracks.
A function with one argument returning None or str. The backend will call
this with every track and ignore tracks where match does not return None.
The returned str should explain why the track does not match.
If you change a track such that it does not match anymore, the exception
NoMatch will be raised and the match stays unchanged.
"""
return self.__match
@match.setter
def match(self, value):
"""see match.getter."""
old_match = self.__match
self.__match = value
try:
self.scan()
except self.NoMatch:
self.__match = old_match
raise
[docs] @classmethod
def decode_category(cls, value: str) ->str:
"""Translate the value from the backend into one out of Track.categories.
Returns:
The decoded name
"""
if value in Track.categories:
return value
if value.capitalize() in Track.categories:
return value.capitalize()
if value not in cls._category_decoding:
raise cls.BackendException('{} gave us an unknown track type "{}"'.format(cls.__name__, value))
return cls._category_decoding[value]
[docs] @classmethod
def encode_category(cls, value: str) ->str:
"""Translate internal value (out of Track.categories) into the backend specific value.
Returns:
The encoded name
"""
if value in cls.supported_categories:
return value
if value.lower() in cls.supported_categories:
return value.lower()
if value in cls._category_encoding:
return cls._category_encoding[value]
for key, target in cls._category_decoding.items():
if value.lower() == target.lower():
return key
raise cls.BackendException('{} has no equivalent for "{}"'.format(cls.__name__, value))
@staticmethod
def _encode_keyword(value: str) ->str:
"""Replicate the translation the backend does. MMT for example capitalizes all words.
Returns:
the encoded keyword
"""
return value
@staticmethod
def _encode_description(track) ->str:
"""A backend might put keywords into the description. WPTrackserver does.
Returns: The string to be saved in the backend
"""
return track.description
def _decode_description(self, track, value, into_header_data=False):
"""A backend might put keywords into the description. WPTrackserver does.
Returns: The description
"""
assert self._decoupled
if into_header_data:
track._header_data['description'] = value
else:
track.description = value
return value
[docs] def get_time(self) ->datetime.datetime: # pylint: disable=no-self-use
"""get time from the server where backend is located as a Linux timestamp.
A backend implementation does not have to support this.
Returns: datetime.datetime
"""
return datetime.datetime.now()
def _change_id(self, track, new_ident: str):
"""Change the id in the backend."""
raise NotImplementedError
[docs] def scan(self, now: bool = False) ->None:
"""Enforce a reload of the list of all tracks in the backend.
This will be delayed until the list is actually needed again.
If this finds an unsaved track not matching the current match
function, an exception is thrown.
Saved Tracks not matching the current match will no be loaded.
Args:
now: If True, do not delay scanning.
"""
self._tracks_fully_listed = False
if now:
self._scan()
def _scan(self) ->None:
"""load the list of all tracks in the backend if not yet done.
Enforce this by calling :meth:`scan` first.
"""
if not self._tracks_fully_listed and not self._decoupled:
self._tracks_fully_listed = True
unsaved = [x for x in self.__tracks if x.id_in_backend is None]
if self.__match is not None:
for track in unsaved:
self.matches(track, 'scan')
self.__tracks = unsaved
if 'scan' in self.supported:
match_function = self.__match
self.__match = None
try:
# _load_track_headers loads ALL tracks, match will be
# applied in a second loop. This way the Backend implementations
# do not have to worry about the match code.
with self._decouple():
self._load_track_headers()
finally:
self.__match = match_function
if self.__match is not None:
self.__tracks = [x for x in self.__tracks if self.matches(x)]
def _found_track(self, ident: str):
"""Create an empty track for ident and inserts it into this backend.
Returns:
the new track
"""
result = Track()
with self._decouple():
result._set_backend(self)
result.id_in_backend = ident
self._append(result)
return result
def _load_track_headers(self):
"""Load all track headers and append them to the backend.
The tracks will not be loaded if possible.
"""
raise NotImplementedError()
def _read_all_decoupled(self, track) ->None:
"""Decouple and calls the backend specific _read_all."""
with self._decouple():
self._read_all(track)
def _read_all(self, track) ->None:
"""fill the track with all its data from source."""
raise NotImplementedError()
[docs] def matches(self, track, exc_prefix: str = None):
"""match track against the current match function.
Args:
exc_prefix: If not None, use it for the beginning of an exception message.
If None, never raise an exception
Returns:
True for match
"""
if self.__match is None:
return True
match_error = self.__match(track)
if match_error and exc_prefix:
raise Backend.NoMatch('{}: {} does not match: {}'.format(exc_prefix, track, match_error))
return match_error is None
def _needs_full_save(self, changes) ->bool:
"""Do we have to rewrite the entire track?.
Returns:
True if we must save fully
"""
for change in changes:
if change == 'all':
return True
write_name = 'write_{}'.format(change.split(':')[0])
if write_name not in self.supported:
return True
return False
[docs] def add(self, track):
"""
Add a track to this backend.
We do not check if it already exists in this backend. No track
already existing in this backend will be overwritten, the id_in_backend
of track will be deduplicated if needed. This is currently only needed
for Directory. Note that some backends reject a track if it is very
similar to an existing track even if it belongs to some other user.
If the track object is already in the list of tracks, raise ValueError.
If the track does not pass the current match function, raise an exception.
Args:
track (~gpxity.track.Track): The track we want to save in this backend.
Returns:
~gpxity.track.Track: The saved track. If the original track lives in a different
backend, a new track living in this backend will be created
and returned.
"""
if self._decoupled:
raise Exception('A backend cannot save() while being decoupled. This is probably a bug in gpxity.')
self.matches(track, 'add')
if track.backend is not self and track.backend is not None:
with track.fenced(self.fences):
new_track = track.clone()
else:
if any(x is track for x in self.__tracks):
raise ValueError('Already in list: Track {} with id={}'.format(track, id(track)))
new_track = track
try:
with self._decouple():
new_track._set_backend(self)
self._write_all(new_track)
self._append(new_track)
track._clear_dirty()
return new_track
except Exception:
# do not do self.remove. If we try to upload the same track to gpsies,
# gpsies will assign the same trackid, and we come here. __tracks will
# only hold the first uploaded track, and remove would remove that
# instance instead of this one.
# TODO: do we have a unittest for that case?
self.__tracks = [x for x in self.__tracks if x is not new_track]
with self._decouple():
new_track.id_in_backend = None
new_track._set_backend(None)
raise
def _new_ident(self, track) ->str:
"""Create an id for track.
Returns:
The new ident. If the backend does not
create an ident in advance, return None. Such
backends will return a new ident after writing.
"""
def _rewrite(self, track, changes):
"""Rewrite the full track.
Used only by Track when things change.
"""
assert track.backend is self
assert self._has_item(track.id_in_backend), '{}: its id_in_backend {} is not in {}'.format(
track, track.id_in_backend, ' / '.join(str(x) for x in self))
assert track._dirty
needs_full_save = self._needs_full_save(changes)
self.matches(track, '_rewrite')
if needs_full_save:
with track.fenced(self.fences):
new_id = self._write_all(track)
track.id_in_backend = new_id
else:
for change in changes:
_ = change.split(':')
write_name = '_write_{}'.format(_[0])
if len(_) == 1:
getattr(self, write_name)(track)
elif len(_) == 2:
getattr(self, write_name)(track, _[1])
else:
raise Exception('dirty {} got too many arguments:{}'.format(write_name, _[1:]))
def _write_all(self, track) ->str:
"""the actual implementation for the concrete Backend.
Writes the entire Track.
Returns:
The new id_in_backend
"""
raise NotImplementedError()
[docs] def remove(self, value) ->None:
"""
Remove track. This can also be done for tracks not passing the current match function.
Args:
value: If it is not an :class:`~gpxity.track.Track`, :meth:`remove` looks
it up by doing :literal:`self[value]`
"""
track = value if hasattr(value, 'id_in_backend') else self[value]
if track.id_in_backend:
self._remove_ident(track.id_in_backend)
with self._decouple():
track._set_backend(None)
try:
self.__tracks = [x for x in self.__tracks if x.id_in_backend != track.id_in_backend]
except ValueError:
pass
def _remove_ident(self, ident: str) ->None:
"""backend dependent implementation."""
raise NotImplementedError()
def _lifetrack_start(self, track, points) -> str: # pylint: disable=unused-argument
"""Modelled after MapMyTracks. I hope this matches other services too.
This will always produce a new track in the backend.
Default is to just add the points to the track.
Args:
track(Track): Holds initial data and points already added.
keep in mind that this process might restart which should
be invisible to the target.
points: Initial points
Returns: The new id_in_backend
For details see :meth:`Track.track() <gpxity.lifetrack.Lifetrack.start>`.
"""
if track.id_in_backend not in self:
track.id_in_backend = self.add(track.clone()).id_in_backend
return track.id_in_backend
def _lifetrack_update(self, track, points):
"""If the backend does not support lifetrack, just add the points to the track.
Args:
track(Track): Holds initial data
points: If None, stop tracking. Otherwise, start tracking
and add points.
For details see :meth:`Track.track() <gpxity.lifetrack.Lifetrack.update>`.
"""
self[track.id_in_backend].add_points(points)
def _lifetrack_end(self, track):
"""Default: Nothing needs to be done."""
[docs] def remove_all(self):
"""Remove all tracks we know about.
If their :attr:`id_in_backend` has meanwhile been changed
through another backend instance or another process, we
cannot find it anymore. We do **not** rescan all tracks in the backend.
If you want to make sure it will be empty, call :meth:`scan` first.
If you use a match function, only matching tracks will be removed."""
for track in list(self):
if self.matches(track):
self.remove(track)
[docs] def destroy(self):
"""If `cleanup` was set at init time, removes all tracks.
Some backends (example: :class:`Directory <gpxity.directory.Directory.destroy>`)
may also remove the account (or directory). See also :meth:`remove_all`."""
if self._cleanup:
self.remove_all()
def __contains__(self, value) ->bool:
"""value is either an a track or a track id.
Does NOT load tracks, only checks what is already known.
Returns:
True if we have the item
"""
self._scan()
return self._has_item(value)
def _has_item(self, index) ->bool:
"""like __contains__ but for internal use: does not call _scan first.
Must not call self._scan.
Returns:
True if we have the item
"""
if hasattr(index, 'id_in_backend') and index in self.__tracks:
return True
if isinstance(index, str) and index in [x.id_in_backend for x in self.__tracks]:
return True
return False
def __getitem__(self, index):
"""Allow accesses like alist[a_id].
Do not call this when implementing a backend because this always calls scan() first.
Instead use :meth:`_has_item`.
Returns:
the track
"""
self._scan()
if isinstance(index, int):
return self.__tracks[index]
for _ in self.__tracks:
if _ is index or _.id_in_backend == index:
return _
raise IndexError
def __len__(self) ->int:
"""do not call this when implementing a backend because this calls scan().
Returns:
the length
"""
self._scan()
return len(self.__tracks)
[docs] def real_len(self) ->int:
"""len(backend) without calling scan() first.
Returns:
the length"""
return len(self.__tracks)
def _append(self, track):
"""Append a track to the cached list."""
if track.id_in_backend is not None and not isinstance(track.id_in_backend, str):
raise Exception('{}: id_in_backend must be str'.format(track))
tracks_with_this_id = [x for x in self.__tracks if x.id_in_backend == track.id_in_backend]
if tracks_with_this_id:
assert len(tracks_with_this_id) == 1
track_with_this_id = tracks_with_this_id[0]
if track_with_this_id.backend is None:
# we actually replace the unsaved track with the new one
del self.__tracks[track_with_this_id]
if track.id_in_backend is not None and any(x.id_in_backend == track.id_in_backend for x in self.__tracks):
# cannot do "in self" because we are not decoupled, so that would call _scan()
raise ValueError(
'Backend.append(track {}): its id_in_backend {} is already in list: Track={}, list={}'.format(
str(track), track.id_in_backend, self[track.id_in_backend], self.__tracks))
self.matches(track, 'append')
self.__tracks.append(track)
def __repr__(self):
"""do not call len(self) because that does things.
Returns:
The repr str
"""
dirname = self.config.username
result = '{}({} in {}{})'.format(
self.__class__.__name__, len(self.__tracks), self.url, dirname)
return result
def __enter__(self):
"""See class docstring.
Returns:
self
"""
return self
def __exit__(self, exc_type, exc_value, trback):
"""See class docstring."""
self.destroy()
def __iter__(self):
"""See class docstring.
Returns:
iterator over tracks
"""
self._scan()
return iter(self.__tracks)
def __eq__(self, other) ->bool: # TODO: use str
"""True if both backends have the same tracks.
Returns:
True if both backends have the same tracks
"""
return {x.key() for x in self} == {x.key() for x in other}
def __copy(self, other_tracks, remove, dry_run):
"""Copy other_tracks into self. Used only by self.merge().
Returns:
verbose messages
"""
result = list()
for old_track in other_tracks:
if not dry_run:
new_track = self.add(old_track)
result.append('{} {} -> {}'.format(
'blind move' if remove else 'blind copy', old_track, '' if dry_run else new_track))
if remove and not dry_run:
old_track.remove()
return result
def __find_mergable_groups(self, tracks, partial_tracks: bool = False):
"""Find mergable groups.
Returns:
A list of tracks. The first one is the sink for the others
"""
result = list()
rest = list(self)
rest.extend(x for x in tracks if str(x.backend) != str(self))
while rest:
root = rest[0]
group = list([root])
group.extend(x for x in rest[1:] if root.can_merge(x, partial_tracks)[0] is not None) # noqa
# merge target should be the longest track in self:
group.sort(key=lambda x: (x.backend is self, x.gpx.get_track_points_no()), reverse=True)
result.append(group)
for _ in group:
rest = [x for x in rest if x is not _]
return result
[docs] def merge(self, other, remove: bool = False, dry_run: bool = False, copy: bool = False,
partial_tracks: bool = False) ->list: # noqa
"""merge other backend or a single track into this one. Tracks within self are also merged.
If two tracks have identical points, or-ify their other attributes.
Args:
other: The backend or a single track to be merged
remove: If True, remove merged tracks
dry_run: If True, do not really merge or remove
copy: Do not try to find a matching track, just copy other into this Backend
partial_tracks: If True, two tracks are mergeable if one of them contains the other one.
Returns:
list(str) A list of messages for verbose output
"""
# pylint: disable=too-many-branches,too-many-locals
# TODO: test for dry_run
# TODO: test for merging single track
# TODO: test for merging a backend or a track with itself. Where
# they may be identical instantiations or not. For all backends.
result = list()
other_tracks = collect_tracks(other)
if copy:
return self.__copy(other_tracks, remove, dry_run)
null_datetime = datetime.datetime(year=1, month=1, day=1)
groups = self.__find_mergable_groups(other, partial_tracks)
merge_groups = [x for x in groups if len(x) > 1]
merge_groups.sort(key=lambda x: x[0].time or null_datetime)
if merge_groups:
result.append('{} mergable groups:'.format(len(merge_groups)))
for _ in merge_groups:
result.append(' {} ----> {}'.format(', '.join(str(x) for x in _[1:]), _[0])) # noqa
for destination, *sources in groups:
if destination.backend is not self:
new_destination = destination if dry_run else self.add(destination)
result.append('{} {} -> {}'.format(
'move' if remove else 'copy', destination, self if dry_run else new_destination))
if remove and not dry_run:
destination.remove()
destination = new_destination
for source in sources:
result.extend(destination.merge(
source, remove=remove, dry_run=dry_run, partial_tracks=partial_tracks))
return result
@staticmethod
def _html_encode(value) ->str:
"""encode str to something gpies.com accepts.
Returns:
the encoded value
"""
if value is None:
return ''
return value.encode('ascii', 'xmlcharrefreplace').decode()
[docs] def flush(self):
"""Some backends delay actual writing. This enforces writing.
Currently, only the Mailer backend can delay, it will bundle all
mailed tracks into one mail instead of sending separate mails
for every track. Needed for lifetracking.
"""
[docs] @classmethod
def is_disabled(cls) ->bool:
"""True if this backend is disabled by env variable GPXITY_DISABLE_BACKENDS.
This variable is a comma separated list of Backend class names.
Returns:
True if disabled
"""
disabled = os.getenv('GPXITY_DISABLE_BACKENDS')
if not disabled:
return False
clsname = cls.__name__.split('.')[-1].lower()
return clsname in disabled.lower().split()
[docs] @classmethod
def find_class(cls, name: str):
"""Find the Backend class name "name".
if "name" contains a ":", only the part before will be used.
Args:
name: May be anycase (upper,lower)
Returns:
the backend class or None
"""
if ':' in name:
name = name.split(':')[0]
for _ in cls.all_backend_classes():
if _.__name__.lower() == name.lower():
return _
return cls.find_class('Directory')
[docs] @classmethod
def parse_objectname(cls, name):
"""Parse the full identifier for a track.
Args:
name: the full identifier for a Track
Returns:
A tuple with class, account, track_id
"""
account = track_id = None
backend_class = cls.find_class(name)
if 'Directory' in backend_class.__name__: # TODO: not nice
if not os.path.exists(name) and name.lower().startswith(backend_class.__name__.lower() + ':'): # noqa
name = ':'.join(name.split(':')[1:])
if os.path.isdir(name):
account = name
else:
id_name = name
if id_name.endswith('.gpx'):
id_name = id_name[:-4]
account = os.path.dirname(id_name) or '.'
track_id = os.path.basename(id_name) or None
else:
rest = ':'.join(name.split(':')[1:])
if '/' in rest:
if rest.count('/') > 1:
raise Exception('wrong syntax in {}'.format(name))
account, track_id = rest.split('/')
else:
account = rest
if account is None:
raise Exception('{} not found'.format(name))
return backend_class, account, track_id
[docs] @classmethod
def all_backend_classes(cls, exclude=None, needs=None):
"""Find all backend classes.
Args:
exclude: A list with classes to be excluded
needs: set(str) with needed supported actions
Returns:
A sorted list of all backend classes. Disabled backends are not
returned.
"""
if cls.__all_backend_classes is None:
backends_directory = os.path.join(os.path.dirname(__file__), 'backends')
if not os.path.exists(backends_directory):
raise Exception('we are not where we should be')
cls.__all_backend_classes = list()
mod_names = os.listdir(backends_directory)
for mod in mod_names:
if not mod.endswith('.py'):
continue
if mod == '__init__':
continue
try:
imported = importlib.import_module('.backends.{}'.format(mod[:-3]), __package__)
classes = (x[1] for x in getmembers(imported, isclass))
# isinstance and is do not work here
classes = (x for x in classes if Backend in getmro(x)[1:])
classes = [x for x in classes if not x.is_disabled()]
cls.__all_backend_classes.extend(classes)
except ImportError:
pass
cls.__all_backend_classes = set(cls.__all_backend_classes)
if exclude is None:
exclude = list()
if needs is None:
needs = set()
return sorted(
(x for x in cls.__all_backend_classes
if x not in exclude and needs < x.supported), key=lambda x: x.__name__)
[docs] def clone(self):
"""return a clone."""
return self.__class__(self.config)
[docs] @classmethod
def instantiate(cls, name, timeout=None):
"""Instantiate a Backend or a Track out of its identifier.
Args:
timeout: Needed for creating backends like MMT or GPSIES.
See :attr:`Backend.timeout <gpxity.backend.Backend.timeout>`
Returns:
A Track or a Backend. If the Backend has already been instantiated, return the cached value.
"""
# pylint: disable=too-many-branches
backend_class, account, track_id = Backend.parse_objectname(name)
clsname = backend_class.__name__
cache_key = (clsname, account)
if cache_key in cls.__all_backends:
result = cls.__all_backends[cache_key]
else:
result = backend_class(auth=account, timeout=timeout)
cls.__all_backends[cache_key] = result
if track_id:
try:
result = result[track_id]
except IndexError:
raise Exception('Backend {} does not have {}'.format(result, track_id))
assert result is not None
return result
def _get_current_keywords(self, track): # pylint:disable=no-self-use
"""A backend might be able to return the currently stored keywords.
This is useful for unittests: Compare the internal state with what the
backend actually says.
"""
return track.keywords