#!/usr/bin/env python """ Parse iTunes library and produce JSON adapted files to send to Elasticsearch Rating note: For albums and artists data, 'Rating' is the average rate for *all* songs in the album or of the artist. So, if in an album, 10 songs are evaluated and 2 not evaluated, 'Rating' will be the sum of rate divided by 12. TODO: Add informations to store number of evaluated songs, and 'Rating' for evaluated song. Parses an iTunes library XML file and generates a JSON file for use in the D3.js JavaScript library. Example Track info: { 'Album': 'Nirvana', 'Persistent ID': 'A50FE1436726815C', 'Track Number': 4, 'Location': 'file://localhost/Users/foo/Music/iTunes/iTunes%20Music/Nirvana/Nirvana/04%20Sliver.mp3', 'File Folder Count': 4, 'Album Rating Computed': True, 'Total Time': 134295, 'Sample Rate': 44100, 'Genre': 'Rock/Alternative', 'Bit Rate': 236, 'Kind': 'MPEG audio file', 'Name': 'Sliver', 'Artist': 'Nirvana', 'Date Added': datetime.datetime(2006, 10, 11, 4, 31, 38), 'Album Rating': 60, 'Rating': 40, 'Date Modified': datetime.datetime(2009, 7, 18, 4, 57, 41), 'Library Folder Count': 1, 'Year': 2002, 'Track ID': 7459, 'Size': 3972838, 'Track Type': 'File', 'Play Count': 2, 'Play Date UTC': datetime.datetime(2009, 7, 18, 5, 00, 00) } """ import argparse import datetime import hashlib import io import json import os import plistlib class JsonCustomEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, set): return list(obj) if isinstance(obj, datetime.datetime): return obj.isoformat() # encoded_object = int(mktime(obj.timetuple())) return json.JSONEncoder.default(self, obj) class ITunesParser: """ Parse an iTunes Library and produce JSON - for ELS """ def __init__(self): self._tracks = {} self._albums = {} self._artists = {} def _read_tracks(self, library_file): """ Read library file and return Tracks key of dict. Dict may contains - Major Version - Minor Version - Date - Application Version - Features - Show Content Ratings - Music Folder - Library Persistent ID - Tracks - ... """ plist = plistlib.load(open(library_file, "rb")) return plist["Tracks"] def parse(self, library_file) -> dict: """ Return an output JSON for an ELS Bulk request - Not a correct format This method call process_album & process_artist TODO Just return a _correct_ JSON and treat in another place/class """ tracks = self._read_tracks(library_file) for _, track in tracks.items(): # Filter out any non-music if track["Track Type"] != "File": continue if "Podcast" in track or "Has Video" in track: continue # Each keeped track are stored self._tracks[track["Persistent ID"]] = track # Retrieve for each track artist information self._process_artist(track) # Retrieve for each track album information self._process_album(track) ret = {"songs": self._tracks, "albums": self._albums, "artists": self._artists} return ret def _process_artist(self, track): """ Process artists in the track part of library and return a JSON formated for a bulk ELS request """ if "Album Artist" not in track and "Artist" not in track: return akey = track["Album Artist"] if "Album Artist" in track else track["Artist"] persistent_id = self.calc_id(akey) if persistent_id not in self._artists: # Key is used to increment/precise some information # So we use artist name as a key to avoid calculating an ID for each track self._artists[persistent_id] = { "Persistent ID": persistent_id, "Name": akey, "Artist": akey, "Track Count": 0, "Play Count": 0, "Rating": 0, "Genre": set(), "Album": set(), } # Compute information play_count = track["Play Count"] if "Play Count" in track else 0 rating = track["Rating"] if "Rating" in track else 0 rating = self.calc_average( rating, self._artists[persistent_id]["Rating"], self._artists[persistent_id]["Track Count"], ) self._artists[persistent_id]["Track Count"] += 1 self._artists[persistent_id]["Rating"] = rating self._artists[persistent_id]["Play Count"] += play_count if "Genre" in track: # Split up the Genres genre_parts = track["Genre"].split("/") self._artists[persistent_id]["Genre"] |= set(genre_parts) if "Album" in track: self._artists[persistent_id]["Album"].add(track["Album"]) def _process_album(self, track): """ Process albums in the track part of library and return a JSON formated for a bulk ELS request """ if "Album" not in track: return akey = track["Album"] persistent_id = self.calc_id(akey) if persistent_id not in self._albums: # Key is used to increment/precise some information # So we use album name as a key to avoid calculating an ID for each track self._albums[persistent_id] = { "Persistent ID": persistent_id, "Name": akey, "Album": akey, "Track Count": 0, "Play Count": 0, "Genre": set(), "Artist": set(), "Avg Bit Rate": track["Bit Rate"], "Min Bit Rate": track["Bit Rate"], # 'Album Artist': '', "Total Time": 0, "Location": "", "Date Added": track["Date Added"], } # Compute information play_count = track["Play Count"] if "Play Count" in track else 0 total_time = track["Total Time"] if "Total Time" in track else 0 avg_bitrate = self.calc_average( track["Bit Rate"], self._albums[persistent_id]["Avg Bit Rate"], self._albums[persistent_id]["Track Count"], ) self._albums[persistent_id]["Avg Bit Rate"] = avg_bitrate self._albums[persistent_id]["Track Count"] += 1 self._albums[persistent_id]["Play Count"] += play_count self._albums[persistent_id]["Total Time"] += total_time self._albums[persistent_id]["Location"] = os.path.dirname(track["Location"]) if self._albums[persistent_id]["Min Bit Rate"] > track["Bit Rate"]: self._albums[persistent_id]["Min Bit Rate"] = track["Bit Rate"] if "Genre" in track: # Split up the Genres genre_parts = track["Genre"].split("/") self._albums[persistent_id]["Genre"] |= set(genre_parts) if "Artist" in track: self._albums[persistent_id]["Artist"].add(track["Artist"]) if "Album Rating" in track: self._albums[persistent_id]["Album Rating"] = track["Album Rating"] if "Album Rating Computed" in track: self._albums[persistent_id]["Album Rating Computed"] = track[ "Album Rating Computed" ] if "Album Artist" in track: self._albums[persistent_id]["Album Artist"] = track["Album Artist"] if "Date Added" in track: if track["Date Added"] < self._albums[persistent_id]["Date Added"]: print(f"Lower date for {akey} - {track['Album']}") self._albums[persistent_id]["Date Added"] = track["Date Added"] @classmethod def calc_average(cls, added_value, current_value, nb_values): """ Calculate average value from a current value, a value to add and the number of values """ return (current_value * nb_values + added_value) / (nb_values + 1) @classmethod def calc_id(cls, key): """ Calculate a MD5 sum from a key as ID """ md5 = hashlib.md5() md5.update(key.encode("UTF-8")) return md5.hexdigest() class WriteElsJson: @staticmethod def write_elements( elements: list, element_type: str, json_style: str, ): """ Write songs to a JSON """ output_filename = f"es-{element_type}.{json_style}" if json_style == "els": output_filename += ".json" json_dump_option = {"ensure_ascii": False, "cls": JsonCustomEncoder} with open(output_filename, "w", encoding="utf-8") as ofile: match json_style: case "json": json_str = json.dumps(elements, **json_dump_option) ofile.write( json_str.replace("}, {", "},\n{") ) # One line = one record case "jsonl": for el in elements: json.dump(el, ofile, **json_dump_option) ofile.write("\n") case "els": for el in elements: json_track_index = { "index": { "_index": f"itunes-{element_type}", "_id": el["Persistent ID"], } } json.dump(json_track_index, ofile, **json_dump_option) ofile.write("\n") json.dump(el, ofile, **json_dump_option) ofile.write("\n") case _: print("ERROR: no write format") #### main block #### # Default input & output files DEFAULT_LIBRARY_FILE_NAME = "iTunesLibrary.xml" DEFAULT_OUTPUT_FILE_NAME = "/es-music-data.json" DEFAULT_LIBRARY_FILE = os.path.expanduser(DEFAULT_LIBRARY_FILE_NAME) DEFAULT_OUTPUT_FILE = ( os.path.dirname(os.path.realpath(__file__)) + DEFAULT_OUTPUT_FILE_NAME ) # Get options parser = argparse.ArgumentParser( description=""" Parse an iTunes XML library file to produce JSON file for ELS bulk operation. """ ) parser.add_argument( "-f", "--file", default=DEFAULT_LIBRARY_FILE, help="iTunes Library XML file path (default: ./" + DEFAULT_LIBRARY_FILE_NAME + ")", ) parser.add_argument( "-o", "--output", default=DEFAULT_OUTPUT_FILE, help="Output to file (default: ." + DEFAULT_OUTPUT_FILE_NAME + ")", ) parser.add_argument( "-c", "--console", action="store_true", help="Output to console instead of file" ) parser.add_argument( "-F", "--format", choices=["json", "jsonl", "els"], default="json", help="Choose JSON style", ) # parser.add_argument('-v', '--verbose', action='store_true', # help='Verbose output') if __name__ == "__main__": args = parser.parse_args() print(f"Parsing file '{args.file}'...") itunes_parser = ITunesParser().parse(args.file) print("Writing JSON files...") WriteElsJson.write_elements( [x for _, x in itunes_parser["songs"].items()], "songs", args.format ) WriteElsJson.write_elements( [x for _, x in itunes_parser["artists"].items()], "artists", args.format ) WriteElsJson.write_elements( [x for _, x in itunes_parser["albums"].items()], "albums", args.format ) print("Done!") # if args.console: # print(output) # else: # with io.open(args.output, 'wb') as outfile: # if sys.version_info.major == 2: # outfile.write(bytes(output)) # elif sys.version_info.major == 3: # outfile.write(bytes(output, 'UTF-8')) # print('JSON data written to: ' + args.output)