-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy path_http_utils.py
More file actions
50 lines (39 loc) · 1.27 KB
/
_http_utils.py
File metadata and controls
50 lines (39 loc) · 1.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import ssl
import certifi
async def get(*, session, url, headers=None, skip_missing=False):
ssl_context = ssl.create_default_context(cafile=certifi.where())
async with session.get(
url,
headers=headers,
timeout=70,
ssl_context=ssl_context,
raise_for_status=not skip_missing,
) as response:
if skip_missing and response.status == 404:
return {}
value = await response.json()
return value
async def put(*, session, url, data, headers=None):
ssl_context = ssl.create_default_context(cafile=certifi.where())
async with session.put(
url,
json=data,
headers=headers,
timeout=70,
ssl_context=ssl_context,
raise_for_status=True,
) as response:
value = await response.json()
return value
async def delete(*, session, url, data, headers=None):
ssl_context = ssl.create_default_context(cafile=certifi.where())
async with session.delete(
url,
json=data,
headers=headers,
timeout=70,
ssl_context=ssl_context,
raise_for_status=True,
) as response:
value = await response.json()
return value