96 lines
2.9 KiB
Python
96 lines
2.9 KiB
Python
import base64
|
|
import os
|
|
from typing import List, Optional
|
|
import requests
|
|
import json
|
|
import re
|
|
|
|
from urllib import parse
|
|
|
|
|
|
TYPESENSE_URL = "http://localhost:8108"
|
|
COLLECTION = "albums"
|
|
HEADERS = {"X-TYPESENSE-API-KEY": "toto", "Content-Type": "application/json"}
|
|
|
|
def get_albums(offset = 0) -> List[tuple[str, str, str]]:
|
|
params = {"q": "*", "per_page": 250, "offset": offset}
|
|
q_url = f"{TYPESENSE_URL}/collections/{COLLECTION}/documents/search"
|
|
# print(q_url)
|
|
req = requests.get(q_url, params=params, headers=HEADERS)
|
|
|
|
ret = [ (hit['document']['id'], hit['document']['Album'], hit['document']['Artist']) for hit in req.json()['hits']]
|
|
return ret
|
|
|
|
def get_album_location(id: int) -> str:
|
|
q_url = f"{TYPESENSE_URL}/collections/{COLLECTION}/documents/{id}"
|
|
# print(q_url)
|
|
req = requests.get(q_url, headers=HEADERS)
|
|
|
|
if "Cover" in req.json():
|
|
return
|
|
|
|
return req.json()["Location"]
|
|
|
|
|
|
def patch_album(id: int, cover: bytes) -> bool:
|
|
q_url = f"{TYPESENSE_URL}/collections/{COLLECTION}/documents/{id}"
|
|
# print(q_url)
|
|
payload = {"Cover": cover.decode()}
|
|
payload = json.dumps(payload)
|
|
req = requests.patch(q_url, headers=HEADERS, data=payload)
|
|
|
|
return req.status_code == 200
|
|
|
|
|
|
def sanitize_location(location: str) -> Optional[str]:
|
|
result = parse.unquote(location, encoding='utf-8', errors='replace')
|
|
result = result.replace("file://localhost/", "")
|
|
result = result.replace(":", "")
|
|
result = re.sub("^([A-Z])", lambda m: m.group(0).lower(), result)
|
|
result = result.replace("%20", " ")
|
|
result = "/mnt/" + result
|
|
result = re.sub('/Disc [0-9]*$', "", result)
|
|
|
|
for file in ["/Pochette.jpg", "/cover.jpg", "/Pochette.png"]:
|
|
if os.path.isfile(result + file):
|
|
result = result + file
|
|
return result
|
|
return None
|
|
|
|
|
|
def get_base64(location: str) -> bytes:
|
|
with open(location, "rb") as image_file:
|
|
encoded_cover = base64.b64encode(image_file.read())
|
|
return encoded_cover
|
|
|
|
def process_one_album(id: int) -> bool:
|
|
raw_loc = get_album_location(id)
|
|
if not raw_loc:
|
|
return False
|
|
loc = sanitize_location(raw_loc)
|
|
if not loc:
|
|
return False
|
|
cover = get_base64(loc)
|
|
if patch_album(id, cover):
|
|
return True
|
|
return False
|
|
|
|
if __name__ == "__main__":
|
|
# process_one_album(2151)
|
|
elements = get_albums()
|
|
processed = 0
|
|
while len(elements) == 250:
|
|
for id, name, artist in elements:
|
|
processed += 1
|
|
raw_loc = get_album_location(id)
|
|
if not raw_loc:
|
|
continue
|
|
loc = sanitize_location(raw_loc)
|
|
if not loc:
|
|
continue
|
|
print(f"Try {id} - {name} ({artist})")
|
|
cover = get_base64(loc)
|
|
if patch_album(id, cover):
|
|
print(f"Album {id} - {name} ({artist}) patched successfully")
|
|
elements = get_albums(offset = processed)
|