#!/usr/bin/env python """ Parse iTunes library and produce JSON adapted files to send to Elasticsearch Rating note: For albums and artists data, 'Rating' is the average rate for *all* songs in the album or of the artist. So, if in an album, 10 songs are evaluated and 2 not evaluated, 'Rating' will be the sum of rate divided by 12. TODO: Add informations to store number of evaluated songs, and 'Rating' for evaluated song. Parses an iTunes library XML file and generates a JSON file for use in the D3.js JavaScript library. Example Track info: { 'Album': 'Nirvana', 'Persistent ID': 'A50FE1436726815C', 'Track Number': 4, 'Location': 'file://localhost/Users/foo/Music/iTunes/iTunes%20Music/Nirvana/Nirvana/04%20Sliver.mp3', 'File Folder Count': 4, 'Album Rating Computed': True, 'Total Time': 134295, 'Sample Rate': 44100, 'Genre': 'Rock/Alternative', 'Bit Rate': 236, 'Kind': 'MPEG audio file', 'Name': 'Sliver', 'Artist': 'Nirvana', 'Date Added': datetime.datetime(2006, 10, 11, 4, 31, 38), 'Album Rating': 60, 'Rating': 40, 'Date Modified': datetime.datetime(2009, 7, 18, 4, 57, 41), 'Library Folder Count': 1, 'Year': 2002, 'Track ID': 7459, 'Size': 3972838, 'Track Type': 'File', 'Play Count': 2, 'Play Date UTC': datetime.datetime(2009, 7, 18, 5, 00, 00) } """ import datetime import io import json import os import plistlib import sys import argparse import hashlib class SetEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, set): return list(obj) if isinstance(obj, datetime.datetime): return obj.isoformat() # encoded_object = int(mktime(obj.timetuple())) return json.JSONEncoder.default(self, obj) class ITunesParser: """ Parse an iTunes Library and produce JSON - for ELS """ SONG_INDEX = 'itunes-songs' ALBUM_INDEX = 'itunes-albums' ARTIST_INDEX = 'itunes-artists' # TODO Put variables in a config files or in a python library def __init__(self): self._tracks = {} self._albums = {} self._artists = {} def _read_tracks(self, library_file): """ Read library file and return Tracks key of dict. Dict may contains - Major Version - Minor Version - Date - Application Version - Features - Show Content Ratings - Music Folder - Library Persistent ID - Tracks - ... """ plist = plistlib.load(open(library_file, 'rb')) return plist['Tracks'] def parse(self, library_file): """ Return an output JSON for an ELS Bulk request - Not a correct format This method call process_album & process_artist TODO Just return a _correct_ JSON and treat in another place/class """ tracks = self._read_tracks(library_file) for _, track in tracks.items(): # Filter out any non-music if track['Track Type'] != 'File': continue if 'Podcast' in track or 'Has Video' in track: continue # Each keeped track are stored self._tracks[track['Persistent ID']] = track # Retrieve for each track artist information self._process_artist(track) # Retrieve for each track album information self._process_album(track) ret = { 'songs': self._tracks, 'albums': self._albums, 'artists': self._artists } return ret def _process_artist(self, track): """ Process artists in the track part of library and return a JSON formated for a bulk ELS request """ if 'Album Artist' not in track and 'Artist' not in track: return akey = track['Album Artist'] if 'Album Artist' in track else track['Artist'] # Add artist if akey not in self._artists: a_id = self.calc_id(akey) # Key is used to increment/precise some information # So we use artist name as a key to avoid calculating an ID for each track self._artists[akey] = { 'Persistent ID': a_id, 'Name': akey, 'Artist': akey, 'Track Count': 0, 'Play Count': 0, 'Rating': 0, 'Genre': set(), 'Album': set() } # Compute information play_count = track['Play Count'] if 'Play Count' in track else 0 rating = track['Rating'] if 'Rating' in track else 0 rating = self.calc_rating(rating, self._artists[akey]['Rating'], self._artists[akey]['Track Count']) self._artists[akey]['Track Count'] += 1 self._artists[akey]['Rating'] = rating self._artists[akey]['Play Count'] += play_count if 'Genre' in track: # Split up the Genres genre_parts = track['Genre'].split('/') self._artists[akey]['Genre'] |= set(genre_parts) if 'Album' in track: self._artists[akey]['Album'].add(track['Album']) def _process_album(self, track): """ Process albums in the track part of library and return a JSON formated for a bulk ELS request """ if 'Album' not in track: return akey = track['Album'] if akey not in self._albums: a_id = self.calc_id(akey) # Key is used to increment/precise some information # So we use album name as a key to avoid calculating an ID for each track self._albums[akey] = { 'Persistent ID': a_id, 'Name': akey, 'Album': akey, 'Track Count': 0, 'Play Count': 0, 'Rating': 0, 'Genre': set(), 'Artist': set(), # 'Album Artist': '', 'Total Time': 0 } # Compute information play_count = track['Play Count'] if 'Play Count' in track else 0 rating = track['Rating'] if 'Rating' in track else 0 rating = self.calc_rating(rating, self._albums[akey]['Rating'], self._albums[akey]['Track Count']) total_time = track['Total Time'] if 'Total Time' in track else 0 self._albums[akey]['Track Count'] += 1 self._albums[akey]['Rating'] = rating self._albums[akey]['Play Count'] += play_count self._albums[akey]['Total Time'] += total_time if 'Genre' in track: # Split up the Genres genre_parts = track['Genre'].split('/') self._albums[akey]['Genre'] |= set(genre_parts) if 'Artist' in track: self._albums[akey]['Artist'].add(track['Artist']) if 'Album Rating' in track: self._albums[akey]['Album Rating'] = track['Album Rating'] self._albums[akey]['Album Rating Computed'] = True if 'Album Artist' in track: self._albums[akey]['Album Artist'] = track['Album Artist'] @classmethod def calc_rating(cls, added_value, current_rating, count): """ Calculate average rating from a current rating, a rating value to add and the number of elements """ return (current_rating * count + added_value) / (count + 1) @classmethod def calc_id(cls, key): """ Calculate a MD5 sum from a key as ID """ md5 = hashlib.md5() md5.update(key.encode('UTF-8')) return md5.hexdigest() class WriteElsJson: @staticmethod def write_artists(artists, output_file): """ Write artists data to another JSON file """ file_artist = io.open(output_file, 'wb') for _, artist in artists.items(): persistent_id = artist['Persistent ID'] artist['Rating'] = round(artist['Rating']) json_track_index = { "index": {"_index": ITunesParser.ARTIST_INDEX, "_id": persistent_id} } file_artist.write(bytes(json.dumps(json_track_index, indent=None, cls=SetEncoder), 'UTF-8')) file_artist.write(bytes("\n", 'UTF-8')) file_artist.write(bytes(json.dumps(artist, indent=None, cls=SetEncoder), 'UTF-8')) file_artist.write(bytes("\n", 'UTF-8')) file_artist.close() @staticmethod def write_albums(albums, output_file): """ Write albums data to another JSON file """ file_albums = io.open(output_file, 'wb') for _, album in albums.items(): persistent_id = album['Persistent ID'] album['Rating'] = round(album['Rating']) json_track_index = { "index": {"_index": ITunesParser.ALBUM_INDEX, "_id": persistent_id} } file_albums.write(bytes(json.dumps(json_track_index, indent=None, cls=SetEncoder), 'UTF-8')) file_albums.write(bytes("\n", 'UTF-8')) file_albums.write(bytes(json.dumps(album, indent=None, cls=SetEncoder), 'UTF-8')) file_albums.write(bytes("\n", 'UTF-8')) file_albums.close() @staticmethod def write_songs(songs, output_file): """ Write songs to a JSON """ file = io.open(output_file, 'wb') for persistent_id, song in songs.items(): json_track_index = { "index": {"_index": ITunesParser.SONG_INDEX, "_id": persistent_id} } file.write(bytes(json.dumps(json_track_index, indent=None, cls=SetEncoder), 'UTF-8')) file.write(bytes("\n", 'UTF-8')) file.write(bytes(json.dumps(song, indent=None, cls=SetEncoder), 'UTF-8')) file.write(bytes("\n", 'UTF-8')) file.close() #### main block #### # Default input & output files DEFAULT_LIBRARY_FILE_NAME = 'iTunesLibrary.xml' DEFAULT_OUTPUT_FILE_NAME = '/es-music-data.json' DEFAULT_LIBRARY_FILE = os.path.expanduser(DEFAULT_LIBRARY_FILE_NAME) DEFAULT_OUTPUT_FILE = os.path.dirname(os.path.realpath(__file__)) + DEFAULT_OUTPUT_FILE_NAME # Get options parser = argparse.ArgumentParser(description=""" Parse an iTunes XML library file to produce JSON file for ELS bulk operation. """) parser.add_argument('-f', '--file', default=DEFAULT_LIBRARY_FILE, help='iTunes Library XML file path (default: ./' + DEFAULT_LIBRARY_FILE_NAME + ')') parser.add_argument('-o', '--output', default=DEFAULT_OUTPUT_FILE, help='Output to file (default: .' + DEFAULT_OUTPUT_FILE_NAME + ')') parser.add_argument('-c', '--console', action='store_true', help='Output to console instead of file') # parser.add_argument('-v', '--verbose', action='store_true', # help='Verbose output') if __name__ == '__main__': args = parser.parse_args() print("Parsing file '{}'...".format(args.file)) itunes_parser = ITunesParser().parse(args.file) print("Writing JSON files...") WriteElsJson.write_songs(itunes_parser['songs'], "es-songs.json") WriteElsJson.write_artists(itunes_parser['artists'], "es-artists.json") WriteElsJson.write_albums(itunes_parser['albums'], "es-albums.json") print('Done!') # if args.console: # print(output) # else: # with io.open(args.output, 'wb') as outfile: # if sys.version_info.major == 2: # outfile.write(bytes(output)) # elif sys.version_info.major == 3: # outfile.write(bytes(output, 'UTF-8')) # print('JSON data written to: ' + args.output)