#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2019 Wolfgang Rohdewald <wolfgang@rohdewald.de>
# See LICENSE for details.
"""This implements :class:`gpxity.directory.Directory`."""
# pylint: disable=protected-access
import os
import sys
import datetime
import tempfile
import logging
from collections import defaultdict
from gpxpy.gpx import GPXXMLSyntaxException
from .. import Backend, GpxFile, DirectoryAccount
from ..util import remove_directory
from ..gpx import Gpx
__all__ = ['Directory']
[docs]class Directory(Backend):
"""Uses a directory for storage.
The filename minus the .gpx ending is used
as :attr:`GpxFile.id_in_backend <gpxity.gpxfile.GpxFile.id_in_backend>`.
If the :class:`~gpxity.directory.Directory` has a title but no id_in_backend,
use the title as id_in_backend.
Make the storage id unique by attaching a number if needed.
A gpxfile without title gets a random name.
The main directory (given by account.url) will have
subdirectories YYYY/MM (year/month) with only the gpxfiles for one month.
Those are symbolic links to the main file and have the same file name.
If :meth:`~gpxity.backend.Backend.save` is given a value for ident, this
is used as id, the file name will be :literal:`id.gpx`.
Otherwise, this backend uses :attr:`GpxFile.title <gpxity.gpxfile.GpxFile.title>` for the id.
If a gpxfile has no title, it uses a random sequence of characters.
Changing the title also changes the id.
Args:
Account: If its url is unset, this will create a temporary
directory named :attr:`prefix`.X where X are some random characters.
It will be removed in __exit__ / detach.
Attributes:
fs_encoding (str): The encoding for file system names. By default, we
expect the file system being able to handle arbitrary UTF-8 encoded names
except character '/' and special names '.' and '..'. If needed, we will introduce
new possible values for fs_encoding like perhaps 'windows'. Gpxity will **never**
support any other character set but UTF-8.
Note that :attr:`fs_encoding` is independent of the platform we are running on - we
might use a network file system.
"""
# pylint: disable=abstract-method
test_is_expensive = False
accepts_zero_points = True
def __init__(self, account):
"""See class docstring."""
assert isinstance(account, DirectoryAccount)
super(Directory, self).__init__(account)
self.fs_encoding = sys.getfilesystemencoding()
if not self.fs_encoding.lower().startswith('utf-8'):
raise Backend.BackendException(
'Backend Directory needs a unicode file system encoding, {} has {}.'
' Please change your locale settings.'.format(self, self.fs_encoding))
self._symlinks = defaultdict(list) # TODO: account.symlinks True
self._load_symlinks()
def __str__(self) ->str:
"""Used for formatting strings. Must be unique within the process.
Returns:
a unique identifier"""
result = self.url
if result:
if result.startswith('./'):
result = result[2:]
else:
result = '.'
return result
@staticmethod
def _strip_gpx(name: str) ->str:
"""If it is there, strip traling .gpx.
Returns:
The stripped string.
"""
if name.endswith('.gpx'):
return name[:-4]
return name
def _load_symlinks(self, directory=None):
"""scan the subdirectories with the symlinks.
If the content of a gpxfile changes, the symlinks might have to
be adapted. But we do not know the name of the existing symlink anymore.
So just scan them all and assign them to id_in_backend."""
if directory is None:
directory = self.url
for dirpath, _, filenames in os.walk(directory):
for filename in filenames:
full_name = os.path.join(dirpath, filename)
if os.path.islink(full_name):
if os.path.exists(full_name):
target = os.readlink(full_name)
gpx_target = self._strip_gpx(os.path.basename(target))
if full_name not in self._symlinks[gpx_target]:
self._symlinks[gpx_target].append(full_name)
else:
os.remove(full_name)
def _new_id_from(self, ident_proposal: str) ->str:
"""Return not yet existant file name.
Args:
ident_proposal: If this proposal does not lead to a valid ident, create unique random ident.
Returns:
The new unique ident
"""
value = self._sanitize_name(ident_proposal)
if not value:
value = os.path.basename(tempfile.NamedTemporaryFile(dir=self.url, prefix='').name)
return self._make_ident_unique(value)
@staticmethod
def _make_path_unique(value) ->str:
"""If the file name already exists, apply a serial number.
If value ends with .gpx, put the serial number in front of that.
Returns:
the unique path name
"""
ctr = 0
unique_value = value
while os.path.exists(unique_value):
ctr += 1
if value.endswith('.gpx'):
unique_value = '{}.{}.gpx'.format(value[:-4], ctr)
else:
unique_value = '{}.{}'.format(value, ctr)
return unique_value
def _make_ident_unique(self, value):
"""Return a unique ident."""
path = Directory._make_path_unique(os.path.join(self.url, value + '.gpx'))
return os.path.basename(path)[:-4]
@staticmethod
def _sanitize_name(value) ->str:
"""Change it to legal file name characters.
Returns:
the sanitized name
"""
if value is None:
return None
return value.replace('/', '_')
[docs] def gpx_path(self, ident) ->str:
"""The full path name for the local copy of a gpxfile.
Returns:
The full path name
"""
assert isinstance(ident, str), '{} must be str'.format(ident)
return os.path.join(self.url, '{}.gpx'.format(ident))
def _list_gpx(self):
"""return a generator of all gpx files, with .gpx removed.
Returns:
A list of all gpx file names with .gpx removed
"""
gpx_names = (x for x in os.listdir(self.url) if x.endswith('.gpx'))
return (x.replace('.gpx', '') for x in gpx_names)
@staticmethod
def _get_field(data, name) ->str:
"""Get xml field out of data.
Returns:
The xml field
"""
start_html = '<{}>'.format(name)
end_html = '</{}>'.format(name)
data = data.split(end_html)
if len(data) > 1:
data = data[0]
data = data.split(start_html)
if len(data) > 1:
data = data[-1]
if start_html not in data:
return data
return None
def _gpx_from_headers(self, ident):
"""Quick scan of file for getting some header fields.
We do this by removing everything after the first point
(or if no point is given everthin after metadata).
Returns: Gpx
"""
self.dump_ids('_gpx_from_headers', ident)
result = Gpx()
with open(self.gpx_path(ident), encoding='utf8') as raw_file:
data = raw_file.read(100000)
head = None
parts = data.split('</trkpt>')
if len(parts) > 1:
head = parts[0] + '</trkpt></trkseg></trk></gpx>'
else:
parts = data.split('</metadata>')
if len(parts) > 1:
head = parts[0] + '</metadata></gpx>'
if head:
try:
result = Gpx.parse(head, is_complete=False)
self.logger.info('Incompletely parsed %s', result)
except GPXXMLSyntaxException:
self.logger.info(
'%s: GpxFile metadata cannot be extracted, there is too much',
GpxFile.identifier(self, ident))
return result
def _list(self):
"""get all gpxfiles for this user."""
self._symlinks = defaultdict(list)
self._load_symlinks()
for _ in self._list_gpx():
gpx = self._gpx_from_headers(_)
gpx.is_complete = False
self._found_gpxfile(_, gpx)
def _read(self, gpxfile):
"""fill the gpxfile with all its data from source."""
self.dump_ids('_read', gpxfile.id_in_backend)
read_filename = self.gpx_path(gpxfile.id_in_backend)
with open(read_filename, encoding='utf-8') as in_file:
try:
gpxfile.gpx = Gpx.parse(in_file.read())
except GPXXMLSyntaxException:
self.logger.error(
'%s cannot be parsed',
read_filename)
raise
def _remove_symlinks(self, ident: str):
"""Remove its symlinks, empty symlink parent directories."""
for symlink in self._symlinks[ident]:
if os.path.exists(symlink):
os.remove(symlink)
symlink_dir = os.path.split(symlink)[0]
try:
os.removedirs(symlink_dir)
except OSError:
pass
self._symlinks[ident] = list()
def _remove_ident(self, ident: str):
"""Remove its symlinks and the file, in this order."""
self._remove_symlinks(ident)
gpx_file = self.gpx_path(ident)
if os.path.exists(gpx_file):
os.remove(gpx_file)
def _symlink_path(self, gpxfile) ->str:
"""The path for the speaking symbolic link: YYYY/MM/title.gpx.
Missing directories YYYY/MM are created.
Returns:
The path
"""
ident = gpxfile.id_in_backend
time = gpxfile.first_time or datetime.datetime.fromtimestamp(os.path.getmtime(self.gpx_path(ident)))
by_month_dir = os.path.join(self.url, '{}'.format(time.year), '{:02}'.format(time.month)) # noqa
if not os.path.exists(by_month_dir):
os.makedirs(by_month_dir)
else:
# make sure there is no dead symlink with our wanted name.
self._load_symlinks(by_month_dir)
name = gpxfile.title or ident
name += '.gpx'
return self._make_path_unique(os.path.join(by_month_dir, self._sanitize_name(name)))
def _new_ident(self, gpxfile):
"""Create an id for gpxfile.
Returns: The new ident.
"""
ident = gpxfile.id_in_backend
if ident is None:
if self.account.id_method == 'counter':
try:
ident = str(max(int(x) for x in self._list_gpx()) + 1)
except ValueError:
ident = '1'
else:
ident = self._new_id_from(None)
return ident
def _make_symlinks(self, gpxfile):
"""Make all symlinks for gpxfile."""
ident = gpxfile.id_in_backend
gpx_pathname = self.gpx_path(ident)
link_name = self._symlink_path(gpxfile)
basename = os.path.basename(gpx_pathname)
link_target = os.path.join('..', '..', basename)
os.symlink(link_target, link_name)
if link_name not in self._symlinks[ident]:
self._symlinks[ident].append(link_name)
def _change_ident(self, gpxfile, new_ident: str):
"""Change the id in the backend. Make it unique if needed."""
assert gpxfile.id_in_backend != new_ident
unique_id = self._new_id_from(new_ident)
self._remove_symlinks(gpxfile.id_in_backend)
self.logger.info('%s: renamed %s to %s', self.account, gpxfile.id_in_backend, unique_id)
os.rename(self.gpx_path(gpxfile.id_in_backend), self.gpx_path(unique_id))
gpxfile.id_in_backend = unique_id
self._make_symlinks(gpxfile)
def _write_all(self, gpxfile) ->str:
"""save full gpx gpxfile.
Since the file name uses title and title may have changed,
compute new file name and remove the old files. We also adapt gpxfile.id_in_backend.
Returns:
the new gpxfile.id_in_backend
"""
new_ident = self._new_ident(gpxfile)
gpxfile.id_in_backend = new_ident
new_path = self.gpx_path(new_ident)
tmp_path = new_path + '.new'
if os.path.exists(tmp_path):
os.remove(tmp_path)
with open(tmp_path, 'w', encoding='utf-8') as out_file:
out_file.write(gpxfile.xml())
time = gpxfile.first_time
if time:
os.utime(tmp_path, (time.timestamp(), time.timestamp()))
os.replace(tmp_path, new_path)
logging.debug('written %s', new_path)
self.dump_ids('_write_all after os.replace', new_ident)
return new_ident
[docs] def dump_ids(self, prefix, ident):
"""For debugging show the IDs found in a file."""
if self.logger.isEnabledFor(logging.DEBUG):
with open(self.gpx_path(ident), 'r', encoding='utf-8') as written_file:
for line in written_file:
if 'Id:' in line:
self.logger.debug('%s: ident:%s ID-Line:%s', prefix, ident, line)
[docs] def detach(self):
"""also remove temporary directory."""
super(Directory, self).detach()
if self.account.is_temporary:
remove_directory(self.url)
@classmethod
def _check_id_legal(cls, value):
"""Check if value is a legal id.
If not, raise ValueError.
"""
# it is not necessary to call BackendBase._check_id_legal
if value is not None:
if '/' in value:
raise ValueError('/ not allowed as id_in_backend for Directory')
if value == '.':
raise ValueError('. not allowed as id_in_backend for Directory')