-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathget_anime_data.py
144 lines (113 loc) · 4.22 KB
/
get_anime_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import asyncio
import os
import pickle
import random
import timeit
import aiohttp
import uvloop
from schema import Anime, Genre, RateLimitError
JIKAN_BASE_URL = "https://api.jikan.moe"
JIKAN_GET_TOP_ANIME_REQ_PARAMS = {
"limit": 25, # 25 is max
"sfw": "true",
}
LIMIT = 30_000
async def jikan_get_genres() -> list[Genre]:
async with aiohttp.ClientSession(JIKAN_BASE_URL) as session:
async with session.get("/genres/anime") as response:
genres = await response.json()
return [Genre.parse(genre) for genre in genres.get("data", [])]
async def jikan_get_top_anime_page(page: int = 1) -> list[Anime]:
try:
async with aiohttp.ClientSession(JIKAN_BASE_URL) as session:
async with session.get(
"/v4/top/anime",
params={"page": page} | JIKAN_GET_TOP_ANIME_REQ_PARAMS,
) as response:
data = await response.json()
if response.status == 429:
raise RateLimitError
return [Anime.parse(anime) for anime in data.get("data")]
except RateLimitError:
await asyncio.sleep(3)
return await jikan_get_top_anime_page(page)
except Exception as e:
await asyncio.sleep(10)
print(f"retrying page {page} because of error {e}", page)
return await jikan_get_top_anime_page(page)
async def jikan_get_top_anime(limit: int = 500) -> list[Anime]:
animes = list()
async with aiohttp.ClientSession(JIKAN_BASE_URL) as session:
async with session.get(
"/v4/top/anime", params={"limit": 1} | JIKAN_GET_TOP_ANIME_REQ_PARAMS
) as response:
data = await response.json()
total_pages = data["pagination"]["last_visible_page"]
for paged_animes in asyncio.as_completed(
[
jikan_get_top_anime_page(page)
for page in range(
1,
min(
total_pages + 1,
limit // JIKAN_GET_TOP_ANIME_REQ_PARAMS.get("limit", 25),
),
)
]
):
animes.extend(await paged_animes)
print(f"Fetched {len(animes)} animes", end="\r")
# Instead of returning animes[:limit], return everything that was fetched
return animes
async def get_image(url: str) -> bytes:
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.read()
except Exception as e:
print(f"Error downloading image from {url}: {e}")
await asyncio.sleep(random.randint(3, 7))
return await get_image(url)
async def download_anime_image(anime: Anime):
fpath = f"images/{anime.mal_id}.jpg"
if os.path.exists(fpath):
return
try:
# In order to not allocate a file handle until we need it, await the
# result of get_image() before opening the file
image = await get_image(anime.image_url)
with open(fpath, "wb") as f:
f.write(image)
except Exception as e:
print(f"FATAL: Error saving downloaded image to {fpath}: {e}")
await asyncio.sleep(random.randint(3, 7))
return await download_anime_image(anime)
async def download_anime_images(animes: list[Anime]):
await asyncio.gather(*[download_anime_image(anime) for anime in animes])
async def main():
start_time = timeit.default_timer()
animes_pkl = "animes.pkl"
if os.path.exists(animes_pkl):
print("loading animes from file")
with open(animes_pkl, "rb") as f:
animes = pickle.load(f)
else:
print("fetching animes from jikan api")
animes = await jikan_get_top_anime(limit=LIMIT)
# Save animes to a file as cache
with open(animes_pkl, "wb") as f:
pickle.dump(animes, f)
anime_fetch_time = timeit.default_timer()
print(f"downloading {len(animes)} images")
await download_anime_images(animes)
end_time = timeit.default_timer()
print(
"done after",
end_time - start_time,
"seconds. anime fetching took",
anime_fetch_time - start_time,
"seconds",
)
if __name__ == "__main__":
os.makedirs("images", exist_ok=True)
uvloop.run(main())