Source code for mygeotab.altitude.daas_definition


[docs] class DaasError:
[docs] def __init__(self, error: dict): self.error = error self.code = self.error['code'] self.domain = self.error['domain'] self.message = self.error['message']
NOT_FULL_API_CALL_EXCEPTION = Exception("api return did not have all expected attributes, please retry again.")
[docs] class DaasResult: """DaasResult class, the base class for all results returned from calling our application from the gateway Attributes: call_result (dict): The result returned from the api call, this includes our gateway information. api_result (dict): The "apiResult" the result computed by altitude application. jobs (list): all the possible results returned by the altitude application call, normally it should always have the length of 1. job (dict): first result returned by the altitude application call (first element of jobs). daas_errors (list): possible errors list that happened on the gateway. api_result_errors (list): possible errors list that happened on the altitude application. api_result_error_message (str): possible single error message that happened on the altitude application. api_result_error (DaasError): possible single error object that happened on the altitude application. errors (list): list of all the errors (gateway and altitude application) combined together. """
[docs] def __init__(self, call_result: dict): if not call_result: self.errors = [Exception("result is empty"), NOT_FULL_API_CALL_EXCEPTION] raise NOT_FULL_API_CALL_EXCEPTION self.call_result = call_result self.daas_errors = [DaasError(error) for error in self.call_result.get("errors", [])] self.errors = [ Exception(error.message) for error in self.daas_errors ] if "apiResult" not in call_result: self.errors += [Exception("apiResult not present"), NOT_FULL_API_CALL_EXCEPTION] raise NOT_FULL_API_CALL_EXCEPTION self.api_result = self.call_result["apiResult"] self.jobs = self.api_result["results"] self.job = self.jobs[0] self.api_result_errors = [DaasError(error) for error in self.api_result.get("errors", [])] self.api_result_error_message = self.api_result.get("errorMessage", None) self.api_result_error = None self.errors += [ Exception(error.message) for error in self.api_result_errors ] if "error" in self.api_result and self.api_result["error"]: self.api_result_error = DaasError(self.api_result["error"]) self.errors += [ Exception(self.api_result_error.message) ] if self.api_result_error_message and isinstance(self.api_result_error_message, str) and len(self.api_result_error_message): self.errors += [ Exception(self.api_result_error_message) ] elif self.api_result_error_message and isinstance(self.api_result_error_message, dict): self.errors += [ Exception(self.api_result_error_message["message"]) ]
[docs] class DaasGetJobStatusResult(DaasResult): """DaasGetJobStatusResult class, the returned format for checking the status of the job Attributes: id (str): the id of the job returned status (dict): the status of the job state (str): the state of the job (from the status object) """
[docs] def __init__(self, call_result: dict): super().__init__(call_result) self.id = self.job['id'] self.status = self.job.get("status", {'state': 'FAILED'}) self.state = self.status.get("state", "FAILED")
[docs] def has_finished(self): if self.state == 'DONE': return True elif self.state != 'FAILED': return False elif self.state == 'FAILED' and self.errors and len(self.errors) > 0: return False else: raise Exception("got to failed state with no error, please reach out.")
[docs] class DaasGetQueryResult(DaasResult): """DaasGetQueryResult class, the returned format for checking the result of the job Attributes: totalRows (str): the id of the job returned rows (list): the rows including the data pageToken (str): the token of the page """
[docs] def __init__(self, call_result: dict): super().__init__(call_result) self.total_rows = self.job.get('totalRows', None) self.rows = self.job.get('rows', None) self.page_token = self.job.get('pageToken', None)