89 lines
3.1 KiB
Python
89 lines
3.1 KiB
Python
|
import time
|
||
|
from datetime import datetime, date
|
||
|
from requests.exceptions import HTTPError
|
||
|
import requests
|
||
|
|
||
|
|
||
|
class Connection:
|
||
|
|
||
|
def __init__(self, config):
|
||
|
self._login = config['IGF_USER']
|
||
|
self._password = config['IGF_PASS']
|
||
|
self.url = config['IGF_URL']
|
||
|
self.access_token = ''
|
||
|
self.access_token_expired_at = ''
|
||
|
|
||
|
def login(self):
|
||
|
json_response = self.raw_request_without_auth(method="post", endpoint="/secure/auth", data={'username': self._login, 'password': self._password})
|
||
|
|
||
|
self.access_token = json_response['access_token']
|
||
|
self.access_token_expired_at = json_response['access_token_expires_at']
|
||
|
print('Login Success!', self.access_token, self.access_token_expired_at)
|
||
|
|
||
|
def raw_request(self, method, endpoint, params=None, data=None):
|
||
|
# TODO доработать сравнение
|
||
|
access_token_expired_at_date, access_token_expired_at_time = self.access_token_expired_at.split(" ")
|
||
|
expired_at = datetime.strptime(access_token_expired_at_date, '%Y-%m-%d')
|
||
|
now = datetime.today()
|
||
|
if (expired_at < now):
|
||
|
print("exp")
|
||
|
self.login(self._login, self._password)
|
||
|
|
||
|
if data is None:
|
||
|
data = {}
|
||
|
if params is None:
|
||
|
params = {}
|
||
|
|
||
|
try:
|
||
|
response = requests.request(method=method,
|
||
|
url="{url}{endpoint}".format(url=self.url, endpoint=endpoint),
|
||
|
auth=BearerAuth(self.access_token),
|
||
|
params=params,
|
||
|
data=data
|
||
|
)
|
||
|
|
||
|
response.raise_for_status()
|
||
|
except HTTPError as http_err:
|
||
|
print(f'HTTP error occurred: {http_err}') # Python 3.6
|
||
|
except Exception as err:
|
||
|
print(f'Other error occurred: {err}') # Python 3.6
|
||
|
else:
|
||
|
json_response = response.json()
|
||
|
|
||
|
return json_response
|
||
|
|
||
|
def raw_request_without_auth(self, method, endpoint, params=None, data=None):
|
||
|
if data is None:
|
||
|
data = {}
|
||
|
if params is None:
|
||
|
params = {}
|
||
|
|
||
|
try:
|
||
|
response = requests.request(method=method,
|
||
|
url="{url}{endpoint}".format(url=self.url, endpoint=endpoint),
|
||
|
params=params,
|
||
|
data=data
|
||
|
)
|
||
|
|
||
|
response.raise_for_status()
|
||
|
except HTTPError as http_err:
|
||
|
print(f'HTTP error occurred: {http_err}') # Python 3.6
|
||
|
except Exception as err:
|
||
|
print(f'Other error occurred: {err}') # Python 3.6
|
||
|
else:
|
||
|
json_response = response.json()
|
||
|
|
||
|
return json_response
|
||
|
|
||
|
def set_url(self, url: str):
|
||
|
self.url = url
|
||
|
|
||
|
|
||
|
class BearerAuth(requests.auth.AuthBase):
|
||
|
def __init__(self, token):
|
||
|
self.token = token
|
||
|
|
||
|
def __call__(self, r):
|
||
|
r.headers["authorization"] = "Bearer " + self.token
|
||
|
return r
|