Source code for mygeotab.ext.feed

# -*- coding: utf-8 -*-

"""
mygeotab.ext.feed
~~~~~~~~~~~~~~~~~

A simple data feed wrapper, written as an extension to the MyGeotab API object.
"""

import abc
from threading import Thread
from time import sleep

from mygeotab import api
from requests.exceptions import ConnectionError


[docs] class DataFeedListener(object): """The abstract DataFeedListener to override""" __metaclass__ = abc.ABCMeta
[docs] @abc.abstractmethod def on_data(self, data): """Called when rows of data are received. :param data: A list of data objects. """ return
[docs] @abc.abstractmethod def on_error(self, error): """Called when server errors are encountered. Return False to close the stream. :rtype: bool :param error: The error object. :return: If True, keep listening. If False, stop the data feed. """ return False
[docs] class DataFeed(object): """A simple wrapper for the MyGeotab Data Feed. Create a listener that inherits from DataFeedListener to pass in. """
[docs] def __init__(self, client_api, listener, type_name, interval, search=None, results_limit=None): """Initializes the DataFeed object. :param client_api: The MyGeotab API object. :param listener: The custom DataFeedListener object. :param type_name: The type of entity. :param interval: The data retrieval interval (in seconds). :param search: The search object. :param results_limit: The maximum number of records to return. """ self.client_api = client_api self.listener = listener self.type_name = type_name self.interval = interval self.search = search self.results_limit = results_limit self.running = False self._version = None self._thread = None
def _run(self): """Runner for the Data Feed.""" while self.running: try: result = self.client_api.call( "GetFeed", type_name=self.type_name, search=self.search, from_version=self._version, results_limit=self.results_limit, ) self._version = result["toVersion"] self.listener.on_data(result["data"]) except (api.MyGeotabException, ConnectionError) as exception: if self.listener.on_error(exception) is False: break if not self.running: break sleep(self.interval) self.running = False
[docs] def start(self, threaded=True): """Start the data feed. :param threaded: If True, run in a separate thread. """ self.running = True if threaded: self._thread = Thread(target=self._run) self._thread.start() else: self._run()