#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2019 Wolfgang Rohdewald <wolfgang@rohdewald.de>
# See LICENSE for details.
"""This implements :class:`gpxity.openrunner.Openrunner` for https://www.openrunner.com."""
# pylint: disable=protected-access
from html.parser import HTMLParser
import datetime
from collections import defaultdict
import logging
import requests
from gpxpy import gpx as mod_gpx
from .. import Backend
from ..gpx import Gpx
GPXTrackPoint = mod_gpx.GPXTrackPoint
if False: # pylint: disable=using-constant-test
try:
import http.client as http_client
except ImportError:
# Python 2
import httplib as http_client
http_client.HTTPConnection.debuglevel = 1
__all__ = ['Openrunner']
class OpenrunnerRawTrack:
"""raw data from the gpies html page."""
# pylint: disable=too-few-public-methods
def __init__(self):
"""See class docstring."""
self.track_id = None
self.title = None
self.time = None
self.distance = None
self.category = None
class ParseOpenrunnerCategories(HTMLParser): # pylint: disable=abstract-method
"""Parse the legal values for category from html."""
def __init__(self):
"""See class docstring."""
super(ParseOpenrunnerCategories, self).__init__()
self.result_names = ['Other']
self.result_ids = [2]
self.current_tag = None
self.seeing_list = False
def feed(self, data):
"""get data."""
self.seeing_list = False
super(ParseOpenrunnerCategories, self).feed(data)
def handle_starttag(self, tag, attrs):
"""starttag from the parser."""
self.current_tag = tag
attributes = defaultdict(str)
for key, value in attrs:
attributes[key] = value
if tag == 'select' and attributes['id'] == 'search_activity':
self.seeing_list = True
if self.seeing_list and tag == 'option':
if attributes['value'] != '':
self.result_ids.append(int(attributes['value']))
def handle_endtag(self, tag):
"""handle end of gpxfile list."""
if tag == 'select':
self.seeing_list = False
def handle_data(self, data):
"""data from the parser."""
if self.seeing_list:
data = data.strip()
if not data:
return
if self.current_tag == 'option':
if data != 'Select an activity':
self.result_names.append(data)
class ParseOpenrunnerSubscription(HTMLParser): # pylint: disable=abstract-method
"""Parse the current subscription from html."""
def __init__(self):
"""See class docstring."""
super(ParseOpenrunnerSubscription, self).__init__()
self.category = None
self.seeing_card_title = False
self.current_card_title = None
self.result = None
def handle_starttag(self, tag, attrs):
"""starttag from the parser."""
attributes = dict(attrs)
self.seeing_card_title = False
if attributes.get('class') == 'card-title':
self.seeing_card_title = True
self.current_card_title = None
def handle_data(self, data):
"""Handle data."""
data = data.strip()
if data == '':
return
if self.seeing_card_title:
self.current_card_title = data
if data == 'Subscription in progress':
self.result = self.current_card_title
class ParseOpenrunnerActivity(HTMLParser): # pylint: disable=abstract-method
"""Parse the category value for a gpxfile from html."""
def __init__(self):
"""See class docstring."""
super(ParseOpenrunnerActivity, self).__init__()
self.current_tag = None
self.category = None
def handle_starttag(self, tag, attrs):
"""starttag from the parser."""
self.current_tag = tag
attributes = defaultdict(str) # TODO: class MyHTMLParser with attributes getter
for key, value in attrs:
attributes[key] = value
attributes = dict(attrs)
if tag == 'input' and attributes['name'] == 'trackTypes' and 'checked' in attributes:
self.category = attributes['id']
class ParseOpenrunnerList(HTMLParser): # pylint: disable=abstract-method
"""get some attributes available only on the web page.
Of course, this is highly unreliable. Just use what we can get."""
def __init__(self):
"""See class docstring."""
super(ParseOpenrunnerList, self).__init__()
self.result = dict()
self.result['gpxfiles'] = list()
self.gpxfile = None
self.column = 0
self.current_tag = None
self.seeing_list = False
self.after_list = False
self.seeing_a = False
def feed(self, data):
"""get data."""
self.gpxfile = None
self.column = 0
self.current_tag = None
self.seeing_list = False
self.after_list = False
self.seeing_a = False
super(ParseOpenrunnerList, self).feed(data)
def handle_starttag(self, tag, attrs):
"""starttag from the parser."""
self.current_tag = tag
attributes = defaultdict(str)
for key, value in attrs:
attributes[key] = value
if tag == 'tbody':
self.seeing_list = True
if not self.seeing_list:
return
if tag == 'tr':
self.gpxfile = OpenrunnerRawTrack()
self.column = 0
self.seeing_a = False
elif tag == 'td':
self.column += 1
elif self.after_list and tag == 'a':
self.seeing_a = True
value = attributes['value'].strip()
def handle_endtag(self, tag):
"""handle end of gpxfile list."""
if tag == 'tbody':
self.seeing_list = False
self.after_list = True
def handle_data(self, data):
"""data from the parser."""
data = data.strip()
if not data:
return
if self.seeing_list:
if self.column == 2:
self.gpxfile.track_id = data
elif self.column == 3:
if self.current_tag == 'h6' and self.gpxfile.title is None:
self.gpxfile.title = data
elif self.column == 4:
self.gpxfile.category = data
elif self.column == 7:
self.gpxfile.distance = float(data)
elif self.column == 10:
self.gpxfile.time = datetime.datetime.strptime(data, '%d-%m-%Y').replace(tzinfo=datetime.timezone.utc)
self.result['gpxfiles'].append(self.gpxfile)
[docs]class Openrunner(Backend):
"""The implementation for openrunner.com.
The gpxfile ident is the ID given by openrunner.
Searching arbitrary gpxfiles is not supported. Openrunner only looks at the
gpxfiles of a specific user.
Args:
account (:class:`~gpxity.accounts.Account`): The account to be used.
Alternatively a dict can be passed to build an ad hoc :class:`~gpxity.accounts.Account`
instance.
"""
# pylint: disable=abstract-method
max_field_sizes = {'keywords': 200}
_default_description = 'None yet. Let everyone know how you got on.'
point_precision = 5
supported_categories = (
'Cycling - Road',
'Canoe-Kayak',
'Cycling - Gravel',
'Cycling - MTB',
'Cycling - Touring',
'Footbiking',
'Hiking',
'Horse riding',
'Longboard',
'Nordic walking',
'River navigation',
'Rollerblading',
'Running - Trail',
'Running - Urban Trail',
'Running- Road',
'Skiing - Backcountry',
'Skiing - Crosscountry',
'Skiing - Rollerskiing',
'Skiing - Touring',
'Snowshoeing',
'Stand Up Paddle',
'Swimming',
'Swimrun',
'Walking',
'Other'
)
_category_decoding = {
'Canoe-Kayak': 'Canoeing',
'Footbiking': 'Cycling - Foot',
'Longboard': 'Miscellaneous',
'Other': 'Miscellaneous',
'River navigation': 'Miscellaneous',
'Rollerblading': 'Skating - Inline',
'Running - Trail': 'Running',
'Running - Urban Trail': 'Running',
'Running- Road': 'Running - Road',
'Skiing - Rollerskiing': 'Skiing - Roller',
'Stand Up Paddle': 'Stand up paddle boarding',
'Swimrun': 'Swimming',
}
# translate internal names to Openrunner names
_category_encoding = {
'Cabriolet': 'Other',
'Canoeing': 'Canoe-Kayak',
'Coach': 'Other',
'Crossskating': 'Other',
'Cycling': 'Cycling - Road',
'Cycling - Foot': 'Footbiking',
'Cycling - Hand': 'Cycling - Road',
'Cycling - Indoor': 'Other',
'Driving': 'Other',
'Enduro': 'Other',
'Flying': 'Other',
'Geocaching': 'Other',
'Gliding': 'Other',
'Hang gliding': 'Other',
'Hiking - Speed': 'Hiking',
'Hot air ballooning': 'Other',
'Jet skiing': 'Other',
'Kayaking': 'Canoe-Kayak',
'Kiteboarding': 'Other',
'Miscellaneous': 'Other',
'Motor racing': 'Other',
'Motorcycling': 'Other',
'Motorhome': 'Other',
'Mountaineering': 'Other',
'Nordic walking': 'Other',
'Off road driving': 'Other',
'Orienteering': 'Other',
'Pack animal trekking': 'Horse riding',
'Paragliding': 'Other',
'Pedelec': 'Other',
'Powerboating': 'River navigation',
'Rowing': 'River navigation',
'Running': 'Running- Road',
'Running - Road': 'Running- Road',
'Sailing': 'Other',
'Sea kayaking': 'Canoe-Kayak',
'Sightseeing': 'Other',
'Skateboarding': 'Other',
'Skating': 'Other',
'Skating - Inline': 'Rollerblading',
'Skiing': 'Skiing - Touring',
'Skiing - Alpine': 'Other',
'Skiing - Nordic': 'Skiing - Backcountry',
'Skiing - Touring': 'Skiing - Backcountry',
'Snowboarding': 'Other',
'Snowshoeing': 'Other',
'Stand up paddle boarding': 'Stand Up Paddle',
'Train': 'Other',
'Wheelchair': 'Other',
'Windsurfing': 'Other',
'Wintersports': 'Other',
}
# translate Openrunner names to Openrunner numbers
_legal_categories_numbers = {
'Canoe-Kayak': 18,
'Cycling - Gravel': 20,
'Cycling - MTB': 3,
'Cycling - Road': 1,
'Cycling - Touring': 11,
'Footbiking': 13,
'Hiking': 9,
'Horse riding': 4,
'Longboard': 25,
'Nordic walking': 12,
'Other': 2,
'River navigation': 8,
'Rollerblading': 5,
'Running - Trail': 10,
'Running - Urban Trail': 19,
'Running- Road': 21,
'Skiing - Backcountry': 14,
'Skiing - Crosscountry': 15,
'Skiing - Rollerskiing': 16,
'Skiing - Touring': 22,
'Snowshoeing': 17,
'Stand Up Paddle': 24,
'Swimming': 6,
'Swimrun': 23,
'Walking': 7,
}
assert set(_legal_categories_numbers.keys()) == set(supported_categories)
default_url = 'https://www.openrunner.com'
@staticmethod
def __encode_number(nbr) -> str:
"""Encode a single unsigned number.
Returns: the encoded string
"""
result = ''
while nbr >= 32:
result += chr(95 + (nbr & 31))
nbr >>= 5
result += chr(63 + nbr)
return result
@classmethod
def __encode_signed_number(cls, nbr) ->str:
"""Encode a single signed number.
Returns: The encoded string
"""
tmp = nbr << 1
if nbr < 0:
tmp = ~tmp
return cls.__encode_number(tmp)
@classmethod
def _encode_points(cls, points) ->str:
"""Encode a list of points.
Returns: the encoded string
"""
result = ''
prev_lat = 0
prev_lon = 0
for point in points:
lat = round(point.latitude * 100000)
lon = round(point.longitude * 100000)
delta_lat = lat - prev_lat
delta_lon = lon - prev_lon
prev_lat = lat
prev_lon = lon
result += cls.__encode_signed_number(delta_lat)
result += cls.__encode_signed_number(delta_lon)
return result
@staticmethod
def _decode_points(input_str) ->list:
"""Decode str into a list of points.
Returns: list(GPXTrackPoint)
"""
def decode_number():
"""Decode a single number."""
nonlocal input_str
result = 0
shift = 0
while True:
ord_value = ord(input_str[0]) - 63
result |= (31 & ord_value) << shift
input_str = input_str[1:]
if ord_value < 32:
break
shift += 5
if 1 & result:
return ~(result >> 1)
return result >> 1
def blow_up(nbr):
return round(0.00001 * nbr, 5)
result = list()
latitude = longitude = 0
while input_str:
latitude += decode_number()
longitude += decode_number()
result.append(GPXTrackPoint(latitude=blow_up(latitude), longitude=blow_up(longitude)))
return result
def _download_legal_categories(self):
"""Needed only for unittest.
Returns: list(str)
all legal values for category.
"""
response = self.__get(action='route/search/page')
category_parser = ParseOpenrunnerCategories()
category_parser.feed(response.text)
return sorted(category_parser.result_names)
@property
def subscription(self) ->str:
"""Get the subscription model.
Returns: The name of the subscription.
"""
if self._cached_subscription is None:
parser = ParseOpenrunnerSubscription()
parser.feed(self.__get(action='user/mysubscription').text)
if parser.result == 'Standard':
self._cached_subscription = 'free'
else:
self._cached_subscription = 'full'
self.logger.debug('%s: subscription: %s', self.account, self._cached_subscription)
return self._cached_subscription
@property
def session(self):
"""The requests.Session for this backend. Only initialized once.
Returns:
The session
"""
ident = str(self)
if ident not in self._session:
if not self.account.username:
raise self.BackendException('{}: Needs authentication data'.format(self.url))
self._session[ident] = requests.Session()
if self.account.password:
data = {
'language': 'en',
'login': self.account.username,
'password': self.account.password,
}
self._session[ident].response = self._session[ident].post(
'{}/user/login'.format(self.url),
data=data, timeout=self.timeout)
self._check_response(self._session_response, data)
if self._session_response is None:
self.logger.info('Openrunner.session got no _session_response')
return self._session[ident]
@property
def _session_response(self):
"""The last response received.
Returns: The response
"""
# TODO: also in GPSIES
ident = str(self)
if ident in self._session:
if hasattr(self._session[ident], 'response'):
return self._session[ident].response
return None
def __http_post(self, post_type, action: str, data=None):
"""Common code for HTTP POST.
Returns: the response
"""
if data is None:
data = dict()
data['_'] = int(datetime.datetime.now().timestamp())
full_url = '{}/{}'.format(self.url, action)
self.session # because headers needs accessToken pylint: disable=pointless-statement
headers = {'X-Language': 'en'}
if self._session_response:
headers['Authorization'] = 'Bearer {}'.format(self._session_response.json()['user']['accessToken'])
method = getattr(self.session, post_type)
else:
method = getattr(requests, post_type)
response = method(full_url, data=data, headers=headers, timeout=self.timeout)
self._check_response(response, data)
return response
def __get(self, action: str, data=None):
"""common code for a GET within the session.
Returns: the response
"""
return self.__http_post("get", action, data)
def __post(self, action: str, data):
"""common code for a POST within the session.
Returns: the response
"""
return self.__http_post("post", action, data)
def __delete(self, action: str, data):
"""common code for a POST within the session.
Returns:
the response
"""
return self.__http_post("delete", action, data)
[docs] @classmethod
def decode_category(cls, value: str) ->str:
"""Translate the value from Openrunner into internal one.
Returns:
The decoded name
"""
try:
return super(Openrunner, cls).decode_category(value)
except Exception:
if value == 'Autre': # TODO: Should not happen
logging.error("Openrunner said Autre")
return 'Miscellaneous'
reverse_nbr = dict(zip(cls._legal_categories_numbers.values(), cls._legal_categories_numbers.keys()))
try:
if int(value) in reverse_nbr:
return cls.decode_category(reverse_nbr[int(value)])
except ValueError:
pass
raise cls.BackendException('Openrunner gave us an unknown gpxfile type {}'.format(value))
def _list(self):
"""get all gpxfiles for this user."""
if self.account.password:
response = self.__get(action='user/myroute')
else:
response = self.__get(action='route/findby?author={}'.format(self._get_author()))
page_parser = ParseOpenrunnerList()
page_parser.feed(response.text)
for raw_data in page_parser.result['gpxfiles']:
gpxfile = self._found_gpxfile(raw_data.track_id, Gpx())
gpxfile.gpx.is_complete = False
gpxfile.title = raw_data.title
gpxfile.time = raw_data.time
if raw_data.distance:
gpxfile.distance = raw_data.distance
if raw_data.category:
gpxfile.category = self.decode_category(raw_data.category)
def _read(self, gpxfile):
"""Get the entire gpxfile."""
response = self.__get(action='route/{}'.format(gpxfile.id_in_backend))
route = response.json()['route']
points = self._decode_points(route['shape']['full_encoded'])
gpx = Gpx()
gpx.add_points(points)
gpx.name = route['name']
gpx.description = route['description']
gpx.keywords = route['keyword']
gpxfile.gpx = gpx
# the date format seems to depend on the language. fr would be %d-%m-%Y
gpxfile.time = datetime.datetime.strptime(
route['updatedDate'], '%Y/%m/%d').replace(tzinfo=datetime.timezone.utc)
gpxfile.public = not route['private']
gpxfile.category = self.decode_category(route['activity'])
def _check_response(self, response, data):
"""are there error messages?."""
if response.status_code != 200:
if 'route_activity_id_foreign' in response.text:
raise self.BackendException('Category id is illegal: {}'.format(data))
if 'route.keyword may not be greater' in response.text:
raise self.BackendException('Keywords too long: {}'.format(data))
if 'Unauthorized' in response.text:
raise self.BackendException('{}: Needs authentication data'.format(self.url))
raise self.BackendException(response.text)
def _write_all(self, gpxfile) ->str:
"""save full gpx gpxfile at Openrunner.
Returns:
The new id_in_backend
"""
points = list(gpxfile.points())
data = {
'route[activity]': self._legal_categories_numbers[self.encode_category(gpxfile.category)],
'route[description]': gpxfile.description,
'route[elevation][sampleEncoded]': self._encode_points(points),
'route[elevation][sampleIntervalInMeter]': gpxfile.distance / len(points),
'route[end][lat]': points[-1].latitude,
'route[end][lng]': points[-1].longitude,
'route[is_private]': 0 if gpxfile.public else 1,
'route[is_tested]': 1,
'route[keyword]': ', '.join(gpxfile.keywords),
'route[labelColor]': '#ffffff',
'route[lengthInMeter]': min(gpxfile.distance, 1200) * 1000, # TODO: unittest: is max length still 1200km?
'route[name]': gpxfile.title,
'route[official]': 0,
'route[shape][pointShapeEncoded]': '',
'route[shape][pointShapeReducedEncoded]': self._encode_points(points[:20]),
'route[shape][pointWaypointEncoded]': self._encode_points(points),
'route[shape][pointWaypointType]': 'A' * len(points),
'route[shape][showMilestone]': 1,
'route[shape][strokeColor]': "#b71c0c",
'route[shape][strokeOpacity]': 0.8,
'route[shape][strokeWidth]': 5,
'route[source]': 'openrunner-web',
'route[start][lat]': points[0].latitude,
'route[start][lng]': points[0].longitude,
'route[surface]': 0,
'route[terrain]': 0,
'route[waymark]': 0,
}
response = self.__post(action='route', data=data)
json = response.json()
new_ident = str(json['id'])
if not new_ident:
raise self.BackendException('No id found in response')
gpxfile.id_in_backend = new_ident
return new_ident
def _remove_ident(self, ident: str):
"""remove on the server."""
self.__delete(action='route', data={'routeIds[0]': ident})
[docs] def detach(self):
"""also close session."""
super(Openrunner, self).detach()
if self.session:
self.session.close()