igf_api_py_client/Connection.py

89 lines
3.1 KiB
Python
Raw Permalink Normal View History

2024-12-22 14:10:39 +03:00
import time
from datetime import datetime, date
from requests.exceptions import HTTPError
import requests
class Connection:
def __init__(self, config):
self._login = config['IGF_USER']
self._password = config['IGF_PASS']
self.url = config['IGF_URL']
self.access_token = ''
self.access_token_expired_at = ''
def login(self):
json_response = self.raw_request_without_auth(method="post", endpoint="/secure/auth", data={'username': self._login, 'password': self._password})
self.access_token = json_response['access_token']
self.access_token_expired_at = json_response['access_token_expires_at']
print('Login Success!', self.access_token, self.access_token_expired_at)
def raw_request(self, method, endpoint, params=None, data=None):
# TODO доработать сравнение
access_token_expired_at_date, access_token_expired_at_time = self.access_token_expired_at.split(" ")
expired_at = datetime.strptime(access_token_expired_at_date, '%Y-%m-%d')
now = datetime.today()
if (expired_at < now):
print("exp")
self.login(self._login, self._password)
if data is None:
data = {}
if params is None:
params = {}
try:
response = requests.request(method=method,
url="{url}{endpoint}".format(url=self.url, endpoint=endpoint),
auth=BearerAuth(self.access_token),
params=params,
data=data
)
response.raise_for_status()
except HTTPError as http_err:
print(f'HTTP error occurred: {http_err}') # Python 3.6
except Exception as err:
print(f'Other error occurred: {err}') # Python 3.6
else:
json_response = response.json()
return json_response
def raw_request_without_auth(self, method, endpoint, params=None, data=None):
if data is None:
data = {}
if params is None:
params = {}
try:
response = requests.request(method=method,
url="{url}{endpoint}".format(url=self.url, endpoint=endpoint),
params=params,
data=data
)
response.raise_for_status()
except HTTPError as http_err:
print(f'HTTP error occurred: {http_err}') # Python 3.6
except Exception as err:
print(f'Other error occurred: {err}') # Python 3.6
else:
json_response = response.json()
return json_response
def set_url(self, url: str):
self.url = url
class BearerAuth(requests.auth.AuthBase):
def __init__(self, token):
self.token = token
def __call__(self, r):
r.headers["authorization"] = "Bearer " + self.token
return r