import base64 import os from typing import List, Optional import requests import json import re from urllib import parse TYPESENSE_URL = "http://localhost:8108" COLLECTION = "albums" HEADERS = {"X-TYPESENSE-API-KEY": "toto", "Content-Type": "application/json"} def get_albums(offset = 0) -> List[tuple[str, str, str]]: params = {"q": "*", "per_page": 250, "offset": offset} q_url = f"{TYPESENSE_URL}/collections/{COLLECTION}/documents/search" # print(q_url) req = requests.get(q_url, params=params, headers=HEADERS) ret = [ (hit['document']['id'], hit['document']['Album'], hit['document']['Artist']) for hit in req.json()['hits']] return ret def get_album_location(id: int) -> str: q_url = f"{TYPESENSE_URL}/collections/{COLLECTION}/documents/{id}" # print(q_url) req = requests.get(q_url, headers=HEADERS) if "Cover" in req.json(): return return req.json()["Location"] def patch_album(id: int, cover: bytes) -> bool: q_url = f"{TYPESENSE_URL}/collections/{COLLECTION}/documents/{id}" # print(q_url) payload = {"Cover": cover.decode()} payload = json.dumps(payload) req = requests.patch(q_url, headers=HEADERS, data=payload) return req.status_code == 200 def sanitize_location(location: str) -> Optional[str]: result = parse.unquote(location, encoding='utf-8', errors='replace') result = result.replace("file://localhost/", "") result = result.replace(":", "") result = re.sub("^([A-Z])", lambda m: m.group(0).lower(), result) result = result.replace("%20", " ") result = "/mnt/" + result result = re.sub('/Disc [0-9]*$', "", result) for file in ["/Pochette.jpg", "/cover.jpg", "/Pochette.png"]: if os.path.isfile(result + file): result = result + file return result return None def get_base64(location: str) -> bytes: with open(location, "rb") as image_file: encoded_cover = base64.b64encode(image_file.read()) return encoded_cover def process_one_album(id: int) -> bool: raw_loc = get_album_location(id) if not raw_loc: return False loc = sanitize_location(raw_loc) if not loc: return False cover = get_base64(loc) if patch_album(id, cover): return True return False if __name__ == "__main__": # process_one_album(2151) elements = get_albums() processed = 0 while len(elements) == 250: for id, name, artist in elements: processed += 1 raw_loc = get_album_location(id) if not raw_loc: continue loc = sanitize_location(raw_loc) if not loc: continue print(f"Try {id} - {name} ({artist})") cover = get_base64(loc) if patch_album(id, cover): print(f"Album {id} - {name} ({artist}) patched successfully") elements = get_albums(offset = processed)