#!/usr/bin/env python """ Parse iTunes library and produce JSON adapted files to send to Elasticsearch Rating note: For albums and artists data, 'Rating' is the average rate for *all* songs in the album or of the artist. So, if in an album, 10 songs are evaluated and 2 not evaluated, 'Rating' will be the sum of rate divided by 12. TODO: Add informations to store number of evaluated songs, and 'Rating' for evaluated song. Parses an iTunes library XML file and generates a JSON file for use in the D3.js JavaScript library. Example Track info: { 'Album': 'Nirvana', 'Persistent ID': 'A50FE1436726815C', 'Track Number': 4, 'Location': 'file://localhost/Users/foo/Music/iTunes/iTunes%20Music/Nirvana/Nirvana/04%20Sliver.mp3', 'File Folder Count': 4, 'Album Rating Computed': True, 'Total Time': 134295, 'Sample Rate': 44100, 'Genre': 'Rock/Alternative', 'Bit Rate': 236, 'Kind': 'MPEG audio file', 'Name': 'Sliver', 'Artist': 'Nirvana', 'Date Added': datetime.datetime(2006, 10, 11, 4, 31, 38), 'Album Rating': 60, 'Rating': 40, 'Date Modified': datetime.datetime(2009, 7, 18, 4, 57, 41), 'Library Folder Count': 1, 'Year': 2002, 'Track ID': 7459, 'Size': 3972838, 'Track Type': 'File', 'Play Count': 2, 'Play Date UTC': datetime.datetime(2009, 7, 18, 5, 00, 00) } """ import argparse import datetime import hashlib import io import json import os import plistlib class SetEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, set): return list(obj) if isinstance(obj, datetime.datetime): return obj.isoformat() # encoded_object = int(mktime(obj.timetuple())) return json.JSONEncoder.default(self, obj) class ITunesParser: """ Parse an iTunes Library and produce JSON - for ELS """ SONG_INDEX = "itunes-songs" ALBUM_INDEX = "itunes-albums" ARTIST_INDEX = "itunes-artists" # TODO Put variables in a config files or in a python library def __init__(self): self._tracks = {} self._albums = {} self._artists = {} def _read_tracks(self, library_file): """ Read library file and return Tracks key of dict. Dict may contains - Major Version - Minor Version - Date - Application Version - Features - Show Content Ratings - Music Folder - Library Persistent ID - Tracks - ... """ plist = plistlib.load(open(library_file, "rb")) return plist["Tracks"] def parse(self, library_file): """ Return an output JSON for an ELS Bulk request - Not a correct format This method call process_album & process_artist TODO Just return a _correct_ JSON and treat in another place/class """ tracks = self._read_tracks(library_file) for _, track in tracks.items(): # Filter out any non-music if track["Track Type"] != "File": continue if "Podcast" in track or "Has Video" in track: continue # Each keeped track are stored self._tracks[track["Persistent ID"]] = track # Retrieve for each track artist information self._process_artist(track) # Retrieve for each track album information self._process_album(track) ret = {"songs": self._tracks, "albums": self._albums, "artists": self._artists} return ret def _process_artist(self, track): """ Process artists in the track part of library and return a JSON formated for a bulk ELS request """ if "Album Artist" not in track and "Artist" not in track: return akey = track["Album Artist"] if "Album Artist" in track else track["Artist"] persistent_id = self.calc_id(akey) if persistent_id not in self._artists: # Key is used to increment/precise some information # So we use artist name as a key to avoid calculating an ID for each track self._artists[persistent_id] = { "Persistent ID": persistent_id, "Name": akey, "Artist": akey, "Track Count": 0, "Play Count": 0, "Rating": 0, "Genre": set(), "Album": set(), } # Compute information play_count = track["Play Count"] if "Play Count" in track else 0 rating = track["Rating"] if "Rating" in track else 0 rating = self.calc_average( rating, self._artists[persistent_id]["Rating"], self._artists[persistent_id]["Track Count"], ) self._artists[persistent_id]["Track Count"] += 1 self._artists[persistent_id]["Rating"] = rating self._artists[persistent_id]["Play Count"] += play_count if "Genre" in track: # Split up the Genres genre_parts = track["Genre"].split("/") self._artists[persistent_id]["Genre"] |= set(genre_parts) if "Album" in track: self._artists[persistent_id]["Album"].add(track["Album"]) def _process_album(self, track): """ Process albums in the track part of library and return a JSON formated for a bulk ELS request """ if "Album" not in track: return akey = track["Album"] persistent_id = self.calc_id(akey) if persistent_id not in self._albums: # Key is used to increment/precise some information # So we use album name as a key to avoid calculating an ID for each track self._albums[persistent_id] = { "Persistent ID": persistent_id, "Name": akey, "Album": akey, "Track Count": 0, "Play Count": 0, "Genre": set(), "Artist": set(), "Avg Bit Rate": track["Bit Rate"], "Min Bit Rate": track["Bit Rate"], # 'Album Artist': '', "Total Time": 0, "Location": "", } # Compute information play_count = track["Play Count"] if "Play Count" in track else 0 total_time = track["Total Time"] if "Total Time" in track else 0 avg_bitrate = self.calc_average( track["Bit Rate"], self._albums[persistent_id]["Avg Bit Rate"], self._albums[persistent_id]["Track Count"], ) self._albums[persistent_id]["Avg Bit Rate"] = avg_bitrate self._albums[persistent_id]["Track Count"] += 1 self._albums[persistent_id]["Play Count"] += play_count self._albums[persistent_id]["Total Time"] += total_time self._albums[persistent_id]["Location"] = os.path.dirname(track["Location"]) if self._albums[persistent_id]["Min Bit Rate"] > track["Bit Rate"]: self._albums[persistent_id]["Min Bit Rate"] = track["Bit Rate"] if "Genre" in track: # Split up the Genres genre_parts = track["Genre"].split("/") self._albums[persistent_id]["Genre"] |= set(genre_parts) if "Artist" in track: self._albums[persistent_id]["Artist"].add(track["Artist"]) if "Album Rating" in track: self._albums[persistent_id]["Album Rating"] = track["Album Rating"] if "Album Rating Computed" in track: self._albums[persistent_id]["Album Rating Computed"] = track[ "Album Rating Computed" ] if "Album Artist" in track: self._albums[persistent_id]["Album Artist"] = track["Album Artist"] @classmethod def calc_average(cls, added_value, current_value, nb_values): """ Calculate average value from a current value, a value to add and the number of values """ return (current_value * nb_values + added_value) / (nb_values + 1) @classmethod def calc_id(cls, key): """ Calculate a MD5 sum from a key as ID """ md5 = hashlib.md5() md5.update(key.encode("UTF-8")) return md5.hexdigest() class WriteElsJson: @staticmethod def write_elements( elements: dict, o_name: str, json_style: str, els_index=ITunesParser.SONG_INDEX, els=False, ): """ Write songs to a JSON """ output_filename = f"{o_name}.{json_style}" with io.open(output_filename, "wb") as ofile: if json_style == "json": ofile.write(bytes("[\n", "UTF-8")) for persistent_id, song in elements.items(): if els: json_track_index = { "index": { "_index": els_index, "_id": persistent_id, } } # file.write(bytes(json.dumps(json_track_index, indent=None, cls=SetEncoder), 'UTF-8')) # file.write(bytes("\n", 'UTF-8')) ofile.write( bytes(json.dumps(song, indent=None, cls=SetEncoder), "UTF-8") ) if json_style == "json": ofile.write( bytes(",", "UTF-8") ) # TODO Doesn't work -> last line... ofile.write(bytes("\n", "UTF-8")) if json_style == "json": ofile.write(bytes("]\n", "UTF-8")) #### main block #### # Default input & output files DEFAULT_LIBRARY_FILE_NAME = "iTunesLibrary.xml" DEFAULT_OUTPUT_FILE_NAME = "/es-music-data.json" DEFAULT_LIBRARY_FILE = os.path.expanduser(DEFAULT_LIBRARY_FILE_NAME) DEFAULT_OUTPUT_FILE = ( os.path.dirname(os.path.realpath(__file__)) + DEFAULT_OUTPUT_FILE_NAME ) # Get options parser = argparse.ArgumentParser( description=""" Parse an iTunes XML library file to produce JSON file for ELS bulk operation. """ ) parser.add_argument( "-f", "--file", default=DEFAULT_LIBRARY_FILE, help="iTunes Library XML file path (default: ./" + DEFAULT_LIBRARY_FILE_NAME + ")", ) parser.add_argument( "-o", "--output", default=DEFAULT_OUTPUT_FILE, help="Output to file (default: ." + DEFAULT_OUTPUT_FILE_NAME + ")", ) parser.add_argument( "-c", "--console", action="store_true", help="Output to console instead of file" ) parser.add_argument( "-F", "--format", choices=["json", "jsonl"], default="json", help="Choose JSON style", ) # parser.add_argument('-v', '--verbose', action='store_true', # help='Verbose output') if __name__ == "__main__": args = parser.parse_args() print(f"Parsing file '{args.file}'...") itunes_parser = ITunesParser().parse(args.file) print("Writing JSON files...") WriteElsJson.write_elements(itunes_parser["songs"], "es-songs", args.format) WriteElsJson.write_elements( itunes_parser["artists"], "es-artists", args.format, ITunesParser.ARTIST_INDEX ) WriteElsJson.write_elements( itunes_parser["albums"], "es-albums", args.format, ITunesParser.ARTIST_INDEX ) print("Done!") # if args.console: # print(output) # else: # with io.open(args.output, 'wb') as outfile: # if sys.version_info.major == 2: # outfile.write(bytes(output)) # elif sys.version_info.major == 3: # outfile.write(bytes(output, 'UTF-8')) # print('JSON data written to: ' + args.output)