Source code for gpxity.backends.openrunner

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright (c) 2019 Wolfgang Rohdewald <wolfgang@rohdewald.de>
# See LICENSE for details.

"""This implements :class:`gpxity.openrunner.Openrunner` for https://www.openrunner.com."""

# pylint: disable=protected-access

from html.parser import HTMLParser
import datetime
from collections import defaultdict

import logging

import requests
from gpxpy import gpx as mod_gpx


from .. import Backend

GPXTrackPoint = mod_gpx.GPXTrackPoint

if False:  # pylint: disable=using-constant-test
    try:
        import http.client as http_client
    except ImportError:
        # Python 2
        import httplib as http_client
    http_client.HTTPConnection.debuglevel = 1

__all__ = ['Openrunner', 'Encoding']


[docs]class Encoding: """Openrunner transfers point data in a compressed format."""
[docs] @staticmethod def encode_number(nbr) -> str: """Encode a single unsigned number. Returns: the encoded string """ result = '' while nbr >= 32: result += chr(95 + (nbr & 31)) nbr >>= 5 result += chr(63 + nbr) return result
[docs] @staticmethod def encode_signed_number(nbr) ->str: """Encode a single signed number. Returns: The encoded string """ tmp = nbr << 1 if nbr < 0: tmp = ~tmp return Encoding.encode_number(tmp)
[docs] @staticmethod def encode_points(points) ->str: """Encode a list of points. Returns: the encoded string """ result = '' prev_lat = 0 prev_lon = 0 for point in points: lat = round(point.latitude * 100000) lon = round(point.longitude * 100000) delta_lat = lat - prev_lat delta_lon = lon - prev_lon prev_lat = lat prev_lon = lon result += Encoding.encode_signed_number(delta_lat) result += Encoding.encode_signed_number(delta_lon) return result
[docs] @staticmethod def decode_points(input_str) ->list: """Decode str into a list of points. Returns: list(GPXTrackPoint) """ def decode_number(): """Decode a single number.""" nonlocal input_str result = 0 shift = 0 while True: ord_value = ord(input_str[0]) - 63 result |= (31 & ord_value) << shift input_str = input_str[1:] if ord_value < 32: break shift += 5 if 1 & result: return ~(result >> 1) return result >> 1 def blow_up(nbr): return round(0.00001 * nbr, 5) result = list() latitude = longitude = 0 while input_str: latitude += decode_number() longitude += decode_number() result.append(GPXTrackPoint(latitude=blow_up(latitude), longitude=blow_up(longitude))) return result
class OpenrunnerRawTrack: """raw data from the gpies html page.""" # pylint: disable=too-few-public-methods def __init__(self): """See class docstring.""" self.track_id = None self.title = None self.time = None self.distance = None self.category = None class ParseOpenrunnerCategories(HTMLParser): # pylint: disable=abstract-method """Parse the legal values for category from html.""" def __init__(self): """See class docstring.""" super(ParseOpenrunnerCategories, self).__init__() self.result_names = ['Autre'] self.result_ids = [2] self.current_tag = None self.seeing_list = False def feed(self, data): """get data.""" self.seeing_list = False super(ParseOpenrunnerCategories, self).feed(data) def handle_starttag(self, tag, attrs): """starttag from the parser.""" self.current_tag = tag attributes = defaultdict(str) for key, value in attrs: attributes[key] = value if tag == 'select' and attributes['id'] == 'search_activity': self.seeing_list = True if self.seeing_list and tag == 'option': if attributes['value'] != '': self.result_ids.append(int(attributes['value'])) def handle_endtag(self, tag): """handle end of track list.""" if tag == 'select': self.seeing_list = False def handle_data(self, data): """data from the parser.""" if self.seeing_list: data = data.strip() if not data: return if self.current_tag == 'option': if data != 'Select an activity': self.result_names.append(data) class ParseOpenrunnerSubscription(HTMLParser): # pylint: disable=abstract-method """Parse the current subscription from html.""" def __init__(self): """See class docstring.""" super(ParseOpenrunnerSubscription, self).__init__() self.category = None self.seeing_card_title = False self.current_card_title = None self.result = None def handle_starttag(self, tag, attrs): """starttag from the parser.""" attributes = dict(attrs) self.seeing_card_title = False if attributes['class'] == 'card-title': self.seeing_card_title = True self.current_card_title = None def handle_data(self, data): """Handle data.""" data = data.strip() if data == '': return if self.seeing_card_title: self.current_card_title = data if data == 'Subscription in progress': self.result = self.current_card_title self.reset() class ParseOpenrunnerActivity(HTMLParser): # pylint: disable=abstract-method """Parse the category value for a track from html.""" def __init__(self): """See class docstring.""" super(ParseOpenrunnerActivity, self).__init__() self.current_tag = None self.category = None def handle_starttag(self, tag, attrs): """starttag from the parser.""" self.current_tag = tag attributes = defaultdict(str) # TODO: class MyHTMLParser with attributes getter for key, value in attrs: attributes[key] = value attributes = dict(attrs) if tag == 'input' and attributes['name'] == 'trackTypes' and 'checked' in attributes: self.category = attributes['id'] class ParseOpenrunnerList(HTMLParser): # pylint: disable=abstract-method """get some attributes available only on the web page. Of course, this is highly unreliable. Just use what we can get.""" def __init__(self): """See class docstring.""" super(ParseOpenrunnerList, self).__init__() self.result = dict() self.result['tracks'] = list() self.track = None self.column = 0 self.current_tag = None self.seeing_list = False self.after_list = False self.seeing_a = False def feed(self, data): """get data.""" self.track = None self.column = 0 self.current_tag = None self.seeing_list = False self.after_list = False self.seeing_a = False super(ParseOpenrunnerList, self).feed(data) def handle_starttag(self, tag, attrs): """starttag from the parser.""" self.current_tag = tag attributes = defaultdict(str) for key, value in attrs: attributes[key] = value if tag == 'tbody': self.seeing_list = True if not self.seeing_list: return if tag == 'tr': self.track = OpenrunnerRawTrack() self.column = 0 self.seeing_a = False elif tag == 'td': self.column += 1 elif self.after_list and tag == 'a': self.seeing_a = True value = attributes['value'].strip() def handle_endtag(self, tag): """handle end of track list.""" if tag == 'tbody': self.seeing_list = False self.after_list = True def handle_data(self, data): """data from the parser.""" data = data.strip() if not data: return if self.seeing_list: if self.column == 2: self.track.track_id = data elif self.column == 3: if self.current_tag == 'h6' and self.track.title is None: self.track.title = data elif self.column == 4: self.track.category = data elif self.column == 7: self.track.distance = float(data) elif self.column == 10: self.track.time = datetime.datetime.strptime(data, '%d-%m-%Y') self.result['tracks'].append(self.track)
[docs]class Openrunner(Backend): """The implementation for openrunner.com. The track ident is the ID given by openrunner. Searching arbitrary tracks is not supported. Openrunner only looks at the tracks of a specific user. Args: url (str): The Url of the server. Default is https://openrunner.com auth (tuple(str, str)): Username and password cleanup (bool): If True, :meth:`~gpxity.backend.Backend.destroy` will remove all tracks in the user account. timeout: If None, there are no timeouts: Openrunner waits forever. For legal values see http://docs.python-requests.org/en/master/user/advanced/#timeouts """ # pylint: disable=abstract-method max_field_sizes = {'keywords': 200} _default_description = 'None yet. Let everyone know how you got on.' _default_public = True # a basic account does not support private tracks point_precision = 5 supported_categories = ( 'Cycling - Road', 'Canoe-Kayak', 'Cycling - Gravel', 'Cycling - MTB', 'Cycling - Touring', 'Footbiking', 'Hiking', 'Horse riding', 'Longboard', 'Nordic walking', 'River navigation', 'Rollerblading', 'Running - Trail', 'Running - Urban Trail', 'Running- Road', 'Skiing - Backcountry', 'Skiing - Crosscountry', 'Skiing - Rollerskiing', 'Skiing - Touring', 'Snowshoeing', 'Stand Up Paddle', 'Swimming', 'Swimrun', 'Walking', 'Autre' ) _category_decoding = { 'Canoe-Kayak': 'Canoeing', 'Cycling - Touring': 'Cycling', 'Cyclisme - Randonnée': 'Cycling - Randonnée', 'Footbiking': 'Cycling - Foot', 'Longboard': 'Miscellaneous', 'Other': 'Miscellaneous', 'River navigation': 'Miscellaneous', 'Rollerblading': 'Skating - Inline', 'Running - Trail': 'Running', 'Running - Urban Trail': 'Running', 'Running- Road': 'Running - Road', 'Skiing - Backcountry': 'Skiing - Touring', 'Skiing - Crosscountry': 'Cross country skiing', 'Skiing - Rollerskiing': 'Skiing - Roller', 'Stand Up Paddle': 'Stand up paddle boarding', 'Swimrun': 'Swimming', } # translate internal names to Openrunner names _category_encoding = { 'Cabriolet': 'Autre', 'Canoeing': 'Canoe-Kayak', 'Coach': 'Autre', 'Crossskating': 'Autre', 'Cycling': 'Cycling - Road', 'Cycling - Foot': 'Footbiking', 'Cycling - Hand': 'Cycling', 'Cycling - Indoor': 'Autre', 'Driving': 'Autre', 'Enduro': 'Autre', 'Flying': 'Autre', 'Geocaching': 'Autre', 'Gliding': 'Autre', 'Handcycle': 'Autre', 'Hang gliding': 'Autre', 'Hiking - Speed': 'Hiking', 'Hot air ballooning': 'Autre', 'Indoor cycling': 'Cycling - Road', 'Jet skiing': 'Autre', 'Kayaking': 'Canoe-Kayak', 'Kiteboarding': 'Autre', 'Miscellaneous': 'Autre', 'Motor racing': 'Autre', 'Motorcycling': 'Autre', 'Motorhome': 'Autre', 'Mountaineering': 'Autre', 'Nordic walking': 'Autre', 'Off road driving': 'Autre', 'Orienteering': 'Autre', 'Pack animal trekking': 'Horse riding', 'Paragliding': 'Autre', 'Pedelec': 'Autre', 'Powerboating': 'River navigation', 'Rowing': 'River navigation', 'Running': 'Running- Road', 'Running - Road': 'Running- Road', 'Sailing': 'Autre', 'Sea kayaking': 'Canoe-Kayak', 'Sightseeing': 'Autre', 'Skateboarding': 'Autre', 'Skating': 'Autre', 'Skating - Inline': 'Rollerblading', 'Skiing': 'Skiing - Touring', 'Skiing - Alpine': 'Autre', 'Skiing - Nordic': 'Skiing - Backcountry', 'Skiing - Touring': 'Skiing - Crosscountry', 'Snowboarding': 'Autre', 'Snowshoeing': 'Autre', 'Stand up paddle boarding': 'Stand Up Paddle', 'Train': 'Autre', 'Wheelchair': 'Autre', 'Windsurfing': 'Autre', } # translate Openrunner names to Openrunner numbers _legal_categories_numbers = { 'Autre': 2, 'Canoe-Kayak': 18, 'Cycling - Gravel': 20, 'Cycling - MTB': 3, 'Cycling - Road': 1, 'Cycling - Touring': 11, 'Footbiking': 13, 'Hiking': 9, 'Horse riding': 4, 'Longboard': 25, 'Nordic walking': 12, 'River navigation': 8, 'Rollerblading': 5, 'Running - Trail': 10, 'Running - Urban Trail': 19, 'Running- Road': 21, 'Skiing - Backcountry': 14, 'Skiing - Crosscountry': 15, 'Skiing - Rollerskiing': 16, 'Skiing - Touring': 22, 'Snowshoeing': 17, 'Stand Up Paddle': 24, 'Swimming': 6, 'Swimrun': 23, 'Walking': 7, } assert set(_legal_categories_numbers.keys()) == set(supported_categories) default_url = 'https://www.openrunner.com' def __init__(self, url=None, auth=None, cleanup=False, timeout=None): """See class docstring.""" if url is None: url = self.default_url super(Openrunner, self).__init__(url, auth, cleanup, timeout) def _download_legal_categories(self): """Needed only for unittest. Returns: list(str) all legal values for category. """ response = self.__get(action='route/search/page') category_parser = ParseOpenrunnerCategories() category_parser.feed(response.text) return sorted(category_parser.result_names) @property def current_subscription(self) ->str: """Get the current subscription model. Returns: The name of the subscription. """ # TODO: unused, untested. Intended for use with callers trying # to set private to True without having a sufficient subscription. parser = ParseOpenrunnerSubscription() parser.feed(self.__get(action='user/mysubscription').text) self.logger.debug('current subscription: %s', parser.result) return parser.result @property def session(self): """The requests.Session for this backend. Only initialized once. Returns: The session """ ident = str(self) if ident not in self._session: if not hasattr(self.config, 'username') or not self.config.username: raise self.BackendException('{}: Needs authentication data'.format(self.url)) self._session[ident] = requests.Session() if self.config.password: data = { 'language': 'en', 'login': self.config.username, 'password': self.config.password, } self._session[ident].response = self._session[ident].post( '{}/user/login'.format(self.url), data=data, timeout=self.timeout) self._check_response(self.session_response, data) if self.session_response is None: self.logger.info('Openrunner.session got no session_response') return self._session[ident] @property def session_response(self): """The last response received. Returns: The response """ # TODO: also in GPSIES ident = str(self) if ident in self._session: if hasattr(self._session[ident], 'response'): return self._session[ident].response return None def __http_post(self, post_type, action: str, data=None): """Common code for HTTP POST. Returns: the response """ if data is None: data = dict() data['_'] = int(datetime.datetime.now().timestamp()) full_url = '{}/{}'.format(self.url, action) self.session # because headers needs accessToken pylint: disable=pointless-statement headers = {'X-Language': 'en'} if self.session_response: headers['Authorization'] = 'Bearer {}'.format(self.session_response.json()['user']['accessToken']) method = getattr(self.session, post_type) else: method = getattr(requests, post_type) response = method(full_url, data=data, headers=headers, timeout=self.timeout) self._check_response(response, data) return response def __get(self, action: str, data=None): """common code for a GET within the session. Returns: the response """ return self.__http_post("get", action, data) def __post(self, action: str, data): """common code for a POST within the session. Returns: the response """ return self.__http_post("post", action, data) def __delete(self, action: str, data): """common code for a POST within the session. Returns: the response """ return self.__http_post("delete", action, data)
[docs] @classmethod def decode_category(cls, value: str) ->str: """Translate the value from Openrunner into internal one. Returns: The decoded name """ try: return super(Openrunner, cls).decode_category(value) except Exception: if value == 'Autre': # TODO: Should not happen logging.error("Openrunner said Autre") return 'Miscellaneous' reverse_nbr = dict(zip(cls._legal_categories_numbers.values(), cls._legal_categories_numbers.keys())) try: if int(value) in reverse_nbr: return cls.decode_category(reverse_nbr[int(value)]) except ValueError: pass raise cls.BackendException('Openrunner gave us an unknown track type {}'.format(value))
def _load_track_headers(self): """get all tracks for this user.""" response = self.__get(action='route/findby?author={}'.format(self._get_author())) page_parser = ParseOpenrunnerList() page_parser.feed(response.text) for raw_data in page_parser.result['tracks']: track = self._found_track(raw_data.track_id) track._header_data['title'] = raw_data.title track._header_data['time'] = raw_data.time if raw_data.distance: track._header_data['distance'] = raw_data.distance if raw_data.category: track._header_data['category'] = self.decode_category(raw_data.category) track._header_data['public'] = True def _read_all(self, track): """Get the entire track.""" response = self.__get(action='route/{}'.format(track.id_in_backend)) route = response.json()['route'] points = Encoding.decode_points(route['shape']['full_encoded']) track.add_points(points) track.title = route['name'] track.description = route['description'] track._decode_keywords(route['keyword']) track._header_data['distance'] = float(route['length']) / 1000.0 # the date format seems to depend on the language. fr would be %d-%m-%Y track._header_data['time'] = datetime.datetime.strptime(route['updatedDate'], '%Y/%m/%d') track.public = not route['private'] track.category = self.decode_category(route['activity']) self.logger.debug('_read_all category: %s -> %s', route['activity'], track.category) def _check_response(self, response, data): """are there error messages?.""" if response.status_code != 200: if 'route_activity_id_foreign' in response.text: raise self.BackendException('Category id is illegal: {}'.format(data)) if 'route.keyword may not be greater' in response.text: raise self.BackendException('Keywords too long: {}'.format(data)) if 'Unauthorized' in response.text: raise self.BackendException('{}: Needs authentication data'.format(self.url)) raise self.BackendException(response.text) def _write_all(self, track) ->str: """save full gpx track at Openrunner. Returns: The new id_in_backend """ if not track.gpx.get_track_points_no(): raise self.BackendException('Openrunner does not accept a track without trackpoints:{}'.format(track)) points = list(track.points()) data = { 'route[activity]': self._legal_categories_numbers[self.encode_category(track.category)], 'route[description]': track.description, 'route[elevation][sampleEncoded]': Encoding.encode_points(points), 'route[elevation][sampleIntervalInMeter]': track.distance() / len(points), 'route[end][lat]': points[-1].latitude, 'route[end][lng]': points[-1].longitude, 'route[is_private]': 0, # TODO: private only if the subscription allows that if track.public else 1, 'route[is_tested]': 1, 'route[keyword]': ', '.join(track.keywords), 'route[labelColor]': '#ffffff', 'route[lengthInMeter]': min(track.distance(), 1200) * 1000, # TODO: unittest: is max length still 1200km? 'route[name]': track.title, 'route[official]': 0, 'route[shape][pointShapeEncoded]': '', 'route[shape][pointShapeReducedEncoded]': Encoding.encode_points(points[:20]), 'route[shape][pointWaypointEncoded]': Encoding.encode_points(points), 'route[shape][pointWaypointType]': 'A' * len(points), 'route[shape][showMilestone]': 1, 'route[shape][strokeColor]': "#b71c0c", 'route[shape][strokeOpacity]': 0.8, 'route[shape][strokeWidth]': 5, 'route[source]': 'openrunner-web', 'route[start][lat]': points[0].latitude, 'route[start][lng]': points[0].longitude, 'route[surface]': 0, 'route[terrain]': 0, 'route[waymark]': 0, } response = self.__post(action='route', data=data) json = response.json() new_ident = str(json['id']) if not new_ident: raise self.BackendException('No id found in response') old_ident = track.id_in_backend track.id_in_backend = new_ident if old_ident: self._remove_ident(old_ident) track.id_in_backend = new_ident self.logger.debug('%s fully written', track) return new_ident def _remove_ident(self, ident: str): """remove on the server.""" self.__delete(action='route', data={'routeIds[0]': ident})
[docs] def destroy(self): """also close session.""" super(Openrunner, self).destroy() if self.session: self.session.close()