# -*- coding: utf-8 -*-
# Copyright (c) 2019 Wolfgang Rohdewald <wolfgang@rohdewald.de>
# See LICENSE for details.
"""Tests for gpxity.backends."""
import unittest
import os
import io
import datetime
import time
import random
from pkgutil import get_data
import tempfile
from contextlib import contextmanager
from subprocess import Popen, PIPE
import logging
try:
import MySQLdb
except ImportError:
pass
import gpxpy
from gpxpy.gpx import GPXTrackPoint
from gpxpy.geo import LocationDelta
from ... import GpxFile, Backend, Account, MemoryAccount, DirectoryAccount
from .. import Memory, Mailer, WPTrackserver, Directory, GPSIES, Openrunner, MMT
from ...util import remove_directory
from ...gpx import Gpx
# pylint: disable=attribute-defined-outside-init,protected-access
__all__ = ['BasicTest']
def disabled(*args) ->tuple():
"""True if any of the backends is disabled.
Returns:
True or False and reason
"""
reason = list()
for _ in args:
if _.is_disabled():
reason.append(_.__name__)
return bool(reason), '{} {} disabled'.format(','.join(reason), 'is' if len(reason) == 1 else 'are') # noqa
[docs]class BasicTest(unittest.TestCase):
"""define some helpers."""
test_passwd = 'pwd'
mysql_ip_address = None
mysql_docker_name = 'gpxitytest_mysql'
[docs] def setUp(self): # noqa
"""define test specific DirectoryAccount.prefix."""
self.maxDiff = None # pylint: disable=invalid-name
Account.path = os.path.join(os.path.dirname(__file__), 'test_accounts')
self.logger = logging.getLogger()
self.logger.level = logging.DEBUG
self.logger.debug('Using accounts out of %s', Account.path)
self.start_time = datetime.datetime.now()
self.unicode_string1 = 'unicode szlig: ß'
self.unicode_string2 = 'something japanese:の諸問題'
self.org_dirprefix = DirectoryAccount.prefix
DirectoryAccount.prefix = 'gpxity.' + '.'.join(self.id().split('.')[-2:]) + '_' # noqa
path = tempfile.mkdtemp(prefix=DirectoryAccount.prefix)
Backend._session.clear()
DirectoryAccount.prefix = path
if not Mailer.is_disabled():
self.start_mailserver()
[docs] def tearDown(self): # noqa
"""Check if there are still /tmp/gpxitytest.* directories."""
if not Mailer.is_disabled():
self.stop_mailserver()
remove_directory(DirectoryAccount.prefix)
DirectoryAccount.prefix = self.org_dirprefix
timedelta = datetime.datetime.now() - self.start_time
self.logger.debug('%s seconds ', timedelta.seconds)
logging.shutdown()
[docs] @contextmanager
def tst_backend(self, backend_cls, subtest=None): # noqa pylint: disable=arguments-differ
"""With pytest, subTest does not do much. At least print the name."""
_ = '---------- subTest {}: {} '.format(self.id().split('.')[-1], backend_cls.__name__)
if subtest:
_ += subtest + ' '
_ += '-' * (80 - len(_))
self.logger.debug(_)
yield super(BasicTest, self).subTest(' ' + backend_cls.__name__)
@staticmethod
def _get_track_from_test_file(name: str, backend_cls=None):
"""get data from a predefined gpx file with a random category supported by the backend.
name is without .gpx
Returns:=
A GPX object
"""
gpx_test_file = os.path.join(os.path.dirname(__file__), '{}.gpx'.format(name))
if not os.path.exists(gpx_test_file):
raise Exception('MMTTests needs a GPX file named {}.gpx for testing in {}'.format(
name, os.getcwd()))
filename = '{}.gpx'.format(name)
data = io.StringIO(get_data(__package__, filename).decode('utf-8'))
result = GpxFile(gpx=Gpx.parse(data))
if backend_cls:
result.category = backend_cls.decode_category(random.choice(backend_cls.supported_categories))
return result
[docs] @classmethod
def create_test_track(
cls, backend_class=None, count: int = 1, idx: int = 0, category: str = None, public: bool = False,
start_time=None, end_time=None):
"""create a :class:`~gpxity.gpxfile.GpxFile`.
It starts off with **test.gpx** and appends a
last gpxfile point, it also changes the time stamp of the last point.
This is done using **count** and **idx**: The last point is set such that
looking at the gpxfiles, they all go in a different direction clockwise, with an angle
in degrees of :literal:`360 * idx / count`.
Args:
backend_class: If given, use it as source for a random category
count: See above. Using 1 as default if not given.
idx: See above. Using 0 as default if not given.
category: The wanted value for the gpxfile.
Default: if count == len(:attr:`GpxFile.categories <gpxity.gpxfile.GpxFile.categories>`),
the default value will be backend_class.supported_categories[idx].
Otherwise a random value from backend_class.supported_categories will be applied.
public: should the gpxfiles be public or private?
start_time: If given, assign it to the first point and adjust all following times
end_time: explicit time for the last point. If None: See above.
Returns:
(~gpxity.gpxfile.GpxFile): A new gpxfile not bound to a backend
"""
# pylint: disable=too-many-locals
result = cls._get_track_from_test_file('test')
if start_time is not None:
if not start_time.tzinfo:
start_time = start_time.replace(tzinfo=datetime.timezone.utc)
result.adjust_time(start_time - result.first_time)
last_point = result.last_point()
if end_time is None:
end_time = last_point.time + datetime.timedelta(hours=10, seconds=idx)
if not end_time.tzinfo:
end_time = end_time.replace(tzinfo=datetime.timezone.utc)
new_point = GPXTrackPoint(
latitude=last_point.latitude, longitude=last_point.longitude + 0.001, time=end_time)
_ = gpxpy.geo.LocationDelta(distance=1000, angle=360 * idx / count)
new_point.move(_)
result.add_points([new_point])
# now set all times such that they are in order with this gpxfile and do not overlap
# with other test gpxfiles
_ = result.first_time
duration = new_point.time - _ + datetime.timedelta(seconds=10)
for point in result.gpx.walk(only_points=True):
point.time += duration * idx
result.title = 'Random GPX # {}'.format(idx)
result.description = 'Description to {}'.format(result.title)
if backend_class is None:
cat_source = GpxFile.categories
else:
cat_source = backend_class.supported_categories
cat_source = [backend_class.decode_category(x) for x in cat_source]
if category:
result.category = category
elif count == len(GpxFile.categories):
result.category = cat_source[idx]
else:
result.category = random.choice(cat_source)
result.public = public
return result
@staticmethod
def _random_datetime():
"""random datetime between now() - 10 days and now().
Returns:
A random datetime
"""
end = datetime.datetime.now().replace(microsecond=0, tzinfo=datetime.timezone.utc)
start = end - datetime.timedelta(days=10)
delta = end - start
int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
random_second = random.randrange(int_delta)
return start + datetime.timedelta(seconds=random_second)
@staticmethod
def _random_keywords(count=100):
"""A set of random keywords, but always the same.
We do not want to generate too many tag ids for MMT.
Returns:
A set of random keywords
"""
state = random.getstate()
try:
random.seed(1)
basis = 'abcdefghijklmnopqrstuvwxyz'
basis += basis.upper()
basis += '/-_+.% $"|\\'
result = set()
while len(result) < count:
candidate = ''.join(random.choice(basis) for x in range(4))
if candidate[0] not in ' -' and candidate[-1] not in ' ':
result.add(candidate)
return result
finally:
random.setstate(state)
@classmethod
def _random_points(cls, count=100, root=None):
"""Get some random points.
Distance between two points will never exceed 200m.
Returns:
A list with count points
"""
start_time = cls._random_datetime()
if root is None:
root = GPXTrackPoint(
latitude=random.uniform(-90.0, 90.0),
longitude=random.uniform(-180.0, 180.0),
elevation=0,
time=start_time)
result = [root]
angle = 50
for _ in range(1, count):
angle = angle + random.uniform(-20, 20)
delta = LocationDelta(distance=random.randrange(200), angle=angle)
point = GPXTrackPoint(latitude=result[-1].latitude, longitude=result[-1].longitude)
point.move(delta)
point.elevation = _
point.time = start_time + datetime.timedelta(seconds=10 * _)
result.append(point)
return result
[docs] def assertBackendLength(self, backend, length): # noqa pylint: disable=invalid-name
"""Check length of backend."""
if len(backend) != length:
message = ','.join(str(x) for x in backend)
self.assertEqual(len(backend), length, '{} should have {} gpxfiles: {}'.format(backend, length, message))
[docs] def assertHasKeywords(self, gpxfile, expected): # noqa pylint: disable=invalid-name
"""MMT shows keywords on the website lowercase but internally it capitalizes them."""
if isinstance(gpxfile.backend, MMT):
original = expected
expected = []
for _ in original:
expected.append(' '.join(x.capitalize() for x in _.split(' ')))
self.assertEqual(gpxfile.keywords, sorted(expected))
[docs] def assertSameTracks(self, backend1, backend2, msg=None, with_category=True, with_last_time=None): # noqa pylint: disable=invalid-name
"""both backends must hold identical gpxfiles."""
self.maxDiff = None # pylint: disable=invalid-name
if with_last_time is None:
with_last_time = not (
isinstance(backend1, (Openrunner, GPSIES)) or isinstance(backend2, (Openrunner, GPSIES)))
if backend1 != backend2:
precision = min(backend1.point_precision, backend2.point_precision)
keys1 = sorted(x.key(with_category, with_last_time, precision=precision) for x in backend1)
keys2 = sorted(x.key(with_category, with_last_time, precision=precision) for x in backend2)
self.assertEqual(keys1, keys2, msg)
[docs] def assertEqualTracks(self, gpxfile1, gpxfile2, msg=None, xml: bool = False, with_category: bool = True): # noqa pylint: disable=invalid-name
"""both gpxfiles must be identical. We test more than necessary for better test coverage.
Args:
xml: if True, also compare xml()"""
self.maxDiff = None
# GPSIES: when uploading gpxfiles. GPSIES sometimes assigns new times to all points,
# starting at 2010-01-01 00:00. Until I find the reason, ignore point times for comparison.
# Openrunner always does.
# MMT now seems to convert times between utc and local time. UP- and downloading
# the same gpxfile changes the point times.
no_time_backend = (GPSIES, Openrunner, MMT)
with_last_time = not (
isinstance(gpxfile1.backend, no_time_backend) or isinstance(gpxfile2.backend, no_time_backend))
precision = Backend.point_precision
if gpxfile1.backend and gpxfile1.backend.point_precision < precision:
precision = gpxfile1.backend.precision
if gpxfile2.backend and gpxfile2.backend.point_precision < precision:
precision = gpxfile2.backend.precision
self.assertEqual(
gpxfile1.key(with_category, with_last_time, precision=precision),
gpxfile2.key(with_category, with_last_time, precision=precision), msg)
self.assertTrue(gpxfile1.points_equal(gpxfile2), msg)
if xml:
self.assertEqual(gpxfile1.gpx.xml(), gpxfile2.gpx.xml(), msg)
[docs] def assertNotEqualTracks(self, gpxfile1, gpxfile2, msg=None, with_category: bool = True): # noqa pylint: disable=invalid-name
"""both gpxfiles must be different. We test more than necessary for better test coverage."""
self.assertNotEqual(gpxfile1.key(with_category), gpxfile2.key(with_category), msg)
self.assertFalse(gpxfile1.points_equal(gpxfile2), msg)
self.assertNotEqual(gpxfile1.gpx.xml(), gpxfile2.gpx.xml(), msg)
[docs] def assertTrackFileContains(self, gpxfile, string, msg=None): # noqa pylint: disable=invalid-name
"""Assert that string is in the physical file. Works only for Directory backend."""
with open(gpxfile.backend.gpx_path(gpxfile.id_in_backend), encoding='utf8') as trackfile:
data = trackfile.read()
self.assertIn(string, data, msg)
[docs] def setup_backend( # noqa pylint: disable=too-many-arguments
self, cls_, test_name: str = None, url: str = None, count: int = 0,
clear_first: bool = None, category: str = None,
public: bool = None):
"""set up an instance of a backend with count gpxfiles.
If count == len(:attr:`GpxFile.categories <gpxity.gpxfile.GpxFile.categories>`),
the list of gpxfiles will always be identical. For an example
see :meth:`TestBackends.test_all_category <gpxity.backends.test.test_backends.TestBackends.test_all_category>`.
Args:
cls_ (Backend): the class of the backend to be created
username: use this to for a specific accout name. Default is 'gpxitytest'.
Special case WPTrackserver: pass the IP address of the mysql test server
url: for the backend, only for Directory
count: how many random gpxfiles should be inserted?
clear_first: if True, first remove all existing gpxfiles. None: do if the backend supports it.
category: The wanted category, one out of GpxFile.categories. But this is a problem because we do the same
call for all backend classes and they support different categories. So: If category is int, this is an
index into Backend.supported_categories which will be decoded into GpxFile.categories
public: should the gpxfiles be public or private? Default is False.
Exception: MMT with subscription free has default True
Returns:
the prepared Backend
"""
# pylint: disable=too-many-branches
if clear_first is None:
clear_first = 'remove' in cls_.supported
if isinstance(category, int):
category = cls_.decode_category(cls_.supported_categories[category])
if cls_ is Memory:
account = MemoryAccount()
elif cls_ is Directory:
account = DirectoryAccount(url)
else:
assert url is None
kwargs = dict()
if cls_ is WPTrackserver:
if not self.find_mysql_docker():
self.create_temp_mysqld() # only once for all tests
self.create_db_for_wptrackserver() # recreate for each test
kwargs['Url'] = self.mysql_ip_address
if test_name:
account_name = '{}_{}_unittest'.format(cls_.__name__, test_name)
else:
account_name = '{}_unittest'.format(cls_.__name__)
account = Account(account_name, **kwargs)
result = cls_(account)
if clear_first and'scan' in cls_.supported and 'write' in cls_.supported:
result.remove_all()
if public is None:
public = cls_ is MMT and result.subscription == 'free'
if count:
# if count == 0, skip this. Needed for write-only backends like Mailer.
while count > len(result):
gpxfile = self.create_test_track(cls_, count, len(result), category=category, public=public)
result.add(gpxfile)
self.assertGreaterEqual(len(result), count)
if clear_first:
self.assertEqual(len(result), count)
return result
[docs] @staticmethod
@contextmanager
def lifetrackserver(directory):
"""Start and ends a server for lifetrack testing."""
exec_name = 'bin/gpxity_server'
logfile = os.path.join(directory, 'gpxity_server.log')
if not os.path.exists(exec_name):
exec_name = 'gpxity_server'
cmdline = '{} --loglevel debug --servername localhost --port 12398 {}'.format(
exec_name, directory)
user_filename = os.path.join(directory, '.users')
if not os.path.exists(user_filename):
with open(user_filename, 'w') as user_file:
user_file.write('gpxitytest:gpxitytestpw\n')
process = Popen(cmdline.split(), stdout=open(logfile, 'a'), stderr=open(logfile, 'a'))
try:
time.sleep(1) # give the server time to start
yield
finally:
if os.path.exists(user_filename):
os.remove(user_filename)
if os.path.exists(logfile):
for _ in open(logfile):
if 'INFO' not in _ and 'HTTP/1.1' not in _:
logging.debug('SRV: %s', _.rstrip())
os.remove(logfile)
elif os.path.exists(directory):
logging.debug('SRV: Directory exists but not gpxity_server.log')
else:
logging.debug('SRV: Directory %s does not exist', directory)
Directory(DirectoryAccount(directory)).remove_all()
process.kill()
[docs] def start_mailserver(self):
"""Start an smptd server for mail testing."""
self.mailserver_process = Popen(
'aiosmtpd -u -n -d -l 127.0.0.1:8025'.split(),
stdout=open('{}/smtpd_stdout'.format(DirectoryAccount.prefix), 'w'),
stderr=open('{}/smtpd_stderr'.format(DirectoryAccount.prefix), 'w'))
time.sleep(1) # give the server time to start
[docs] def stop_mailserver(self):
"""Stop the smtp server for mail testing."""
self.mailserver_process.kill()
for _ in ('out', 'err'):
filename = '{}/smtpd_std{}'.format(DirectoryAccount.prefix, _)
if not os.path.exists(filename):
logging.debug('MAIL: %s not found', filename)
continue
if _ == 'err':
for fileline in open(filename):
if not fileline.startswith('INFO:'):
logging.debug('MAIL:%s: %s', _, fileline.rstrip())
os.remove(filename)
[docs] @contextmanager
def temp_backend(self, cls_, url=None, count=0, # pylint: disable=too-many-arguments
cleanup=True, clear_first=None, category=None,
public: bool = None, test_name=None):
"""Just like setup_backend but usable as a context manager. which will call detach() when done."""
tmp_backend = self.setup_backend(cls_, test_name, url, count, clear_first, category, public)
try:
yield tmp_backend
finally:
if cleanup and 'remove' in cls_.supported:
tmp_backend.remove_all()
tmp_backend.detach()
[docs] @contextmanager
def temp_directory(self, url=None, count=0, # pylint: disable=too-many-arguments
cleanup=True, clear_first=None, category=None,
public: bool = None, test_name=None):
"""Temp directory backend."""
tmp_backend = self.setup_backend(Directory, test_name, url, count, clear_first, category, public)
try:
yield tmp_backend
finally:
if cleanup:
tmp_backend.remove_all()
tmp_backend.detach()
[docs] @classmethod
def create_db_for_wptrackserver(cls):
"""Create the mysql database for the WPTrackserver tests."""
count = 0
while True:
try:
server = MySQLdb.connect(
host=cls.mysql_ip_address, user='root', passwd=cls.test_passwd, connect_timeout=2,
autocommit=True, charset='utf8',
sql_mode='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION')
break
except MySQLdb._exceptions.OperationalError:
# wait until the docker instance is ready
count += 1
if count > 50:
raise
time.sleep(1)
cursor = server.cursor()
cursor._defer_warnings = True
cursor.execute('drop database if exists gpxitytest_db')
cursor._defer_warnings = False
cursor.execute('create database gpxitytest_db')
cursor.execute('use gpxitytest_db')
cursor.execute("""
CREATE TABLE wp_ts_locations (
id int NOT NULL AUTO_INCREMENT,
trip_id int NOT NULL,
latitude double NOT NULL,
longitude double NOT NULL,
altitude double NOT NULL,
speed double NOT NULL default 0.0,
heading double NOT NULL default 0.0,
updated timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
created timestamp NOT NULL default '0000-00-00 00:00:00',
occurred timestamp not null default '0000-00-00 00:00:00',
comment varchar(255) NOT NULL,
hidden tinyint(1) NOT NULL default 0,
PRIMARY KEY (id),
KEY occurred (occurred),
KEY trip_id (trip_id)
) ENGINE=InnoDB
""")
cursor.execute("""
CREATE TABLE wp_ts_tracks (
id int NOT NULL AUTO_INCREMENT,
user_id int NOT NULL,
name varchar({title_length}) NOT NULL,
updated timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
created timestamp NOT NULL default '0000-00-00 00:00:00',
source varchar(255) NOT NULL default '',
comment varchar({descr_length}) NOT NULL default '',
distance int NOT NULL,
PRIMARY KEY (id),
KEY user_id (user_id)
)
""".format(
descr_length=WPTrackserver._max_length['description'],
title_length=WPTrackserver._max_length['title']))
cursor.execute("""
CREATE TABLE wp_users (
ID bigint(20) unsigned NOT NULL AUTO_INCREMENT,
user_login varchar(60) NOT NULL DEFAULT '',
user_pass varchar(255) NOT NULL DEFAULT '',
user_nicename varchar(50) NOT NULL DEFAULT '',
user_email varchar(100) NOT NULL DEFAULT '',
user_url varchar(100) NOT NULL DEFAULT '',
user_registered datetime NOT NULL DEFAULT '1970-01-01',
user_activation_key varchar(255) NOT NULL DEFAULT '',
user_status int NOT NULL DEFAULT '0',
display_name varchar(250) NOT NULL DEFAULT '',
PRIMARY KEY (ID),
KEY user_login_key (user_login),
KEY user_nicename (user_nicename),
KEY user_email (user_email)
)
""")
cursor.execute("""
insert into wp_users (user_login) values(%s)""", ['gpxitytest'])
cursor.execute("select id,user_login from wp_users")
server.commit()
server.close()
[docs] @classmethod
def find_mysql_docker(cls) ->bool:
"""Find an already running docker.
Returns:
the IP address
"""
if cls.mysql_ip_address is not None:
return True
cls.mysql_ip_address = Popen([
'docker', 'inspect', '--format', '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}',
cls.mysql_docker_name], stdout=PIPE).communicate()[0].decode().strip()
if cls.mysql_ip_address == '':
cls.mysql_ip_address = None
return bool(cls.mysql_ip_address)
[docs] @classmethod
def create_temp_mysqld(cls):
"""Create a temporary mysql server and initialize it with test data for WPTrackserver."""
cmd = [
'docker', 'run', '--name', cls.mysql_docker_name, '--detach',
'--env', 'MYSQL_ROOT_PASSWORD={}'.format(cls.test_passwd), 'mysql']
with Popen(cmd, stdout=PIPE, stderr=PIPE) as _:
std_err = _.stderr.read().strip().decode('utf8')
if std_err:
logging.error(std_err)
if 'on network bridge: failed to add the host' in std_err:
logging.error('did you reboot after kernel upgrade?')
raise Exception('Cannot run docker: {}'.format(std_err))
if not cls.find_mysql_docker():
raise Exception('Unknown problem while creating mysql docker')