Source code for mygeotab.altitude.wrapper

import sys
import time
import logging
from .daas_definition import DaasGetQueryResult, DaasGetJobStatusResult, DaasResult, NOT_FULL_API_CALL_EXCEPTION
from ..api import API, DEFAULT_TIMEOUT


[docs] class AltitudeAPI(API):
[docs] def __init__( self, username, password=None, database=None, session_id=None, server="my.geotab.com", timeout=DEFAULT_TIMEOUT, proxies=None, cert=None, ): """ A wrapper around mygeotab API for altitude users. :param username: The username used for MyGeotab servers. Usually an email address. :type username: str :param password: The password associated with the username. Optional if `session_id` is provided. :type password: str :param database: The database or company name. Optional as this usually gets resolved upon authentication. :type database: str :param session_id: A session ID, assigned by the server. :type session_id: str :param server: The server ie. my23.geotab.com. Optional as this usually gets resolved upon authentication. :type server: str or None :param timeout: The timeout to make the call, in seconds. By default, this is 300 seconds (or 5 minutes). :type timeout: float or None :param proxies: The proxies dictionary to apply to the request. :type proxies: dict or None :param cert: The path to client certificate. A single path to .pem file or a Tuple (.cer file, .key file). :type cert: str or Tuple or None :raise Exception: Raises an Exception if a username, or one of the session_id or password is not provided. """ super().__init__( username=username, password=password, database=database, session_id=session_id, server=server, timeout=timeout, proxies=proxies, cert=cert, ) _ = logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
def _call_api( self, service_name: str, function_name: str, function_parameters: dict ) -> dict: results = self.call( method="GetBigDataResults", serviceName=service_name, functionName=function_name, functionParameters=function_parameters, ) return results
[docs] def call_api(self, function_name: str, params: dict) -> dict : ''' Supports getJobStatus calls, and getQueryResults calls. Retries in case of errors like connection rest. ''' assert function_name in ["getJobStatus", "getQueryResults", "createQueryJob"] max_tries = 5 for try_index in range(max_tries): try: call_result = self._call_api( service_name=params["serviceName"], function_name=function_name, function_parameters=params["functionParameters"], ) DaasResult(call_result) return call_result except Exception as e: if e == NOT_FULL_API_CALL_EXCEPTION: if try_index == max_tries - 1: raise e else: print(f"encountered error trying to parse to api response {str(call_result)}, retrying....") time.sleep((try_index + 1)*10) else: raise e
[docs] def create_job(self, params: dict) -> dict: """ creates the job with the given params. """ try: results = self.call_api( function_name="createQueryJob", params=params, ) errors = results.get("errors", []) if errors and len(errors) > 0: raise errors[0] return results["apiResult"]["results"][0] except Exception as e: logging.error(f"Exception: {e}") logging.error(f"error: error while creating job, results: {results}") raise e
[docs] def check_job_status(self, params: dict) -> DaasGetJobStatusResult: """ checks the status of a given job. jobId needs to be included in params. """ call_result = self.call_api("getJobStatus", params) daas_status = DaasGetJobStatusResult(call_result) return daas_status
[docs] def wait_for_job_to_complete(self, params: dict) -> dict: """ waits for a job to finish running and returns the job. jobId needs to be included in params. """ while True: try: daas_status = self.check_job_status(params) if daas_status.errors and len(daas_status.errors) > 0: raise daas_status.errors[0] else: if daas_status.has_finished(): break time.sleep(5) logging.info(f"waiting for results: {daas_status.job}") except Exception as e: logging.error(f"Exception: {e}") logging.error(f"error: error while waiting for job to complete, result: {daas_status.job}") raise e return daas_status.job
[docs] def fetch_data(self, params: dict) -> dict: """ fetch data for the given params. jobId needs to be included in params. """ index = 1 daas_status = self.check_job_status(params) if daas_status.errors and len(daas_status.errors) > 0 or (not daas_status.has_finished()): raise Exception("fetch data was called before job had finished correctly.") while index: try: call_result = self.call_api("getQueryResults", params) daas_result = DaasGetQueryResult(call_result) results = daas_result.job rows = daas_result.rows page_token = daas_result.page_token total_rows = daas_result.total_rows if index == 1: logging.info(f"total number of rows: {total_rows}") params["functionParameters"]["pageToken"] = page_token yield {"data": [rows, total_rows, index], "errors": daas_result.errors} index += 1 if not page_token: index = None yield except Exception as e: logging.error(f"Exception: {e}") logging.error(f"error: error while fetching data, results: {results}") index = None raise e
[docs] def get_data(self, params: dict) -> list: """ uses and iterates through fetch_data for the given params, and returns the combined data. jobId needs to be included in params. """ data = [] results_iterator = self.fetch_data(params) for data_page in results_iterator: try: if type(data_page) == str: raise Exception("recieved error:", data_page) page = [] if data_page is None else data_page["data"][0] errors: list = None if data_page == None else data_page.get("errors", None) if errors and len(errors) > 0: logging.error(f"got error when getting data: {errors[0]}") raise errors[0] data.extend(page) except Exception as e: logging.error(f"error: error while combining data:: {e}") raise e return data
[docs] def do(self, params: dict) -> list: """ given the parameters, will call the request, wait on it to finish and return the combined data. """ logging.info(f"creating job") job = self.create_job(params) logging.info(f"job created: {job}") params["functionParameters"]["jobId"] = job["id"] logging.info(f"checking the job status") results = self.wait_for_job_to_complete(params) logging.info(f"job finished: {results}") logging.info(f"gathering result") data = self.get_data(params) logging.info(f"data gathered") return data