Files
iTunes/iTunesParser.py
Maxence G. de Montauzan 56050d0a49 Suggester: take Album Artist
But it's not OK in dashboard ->
For example, search 'ayache' (for Superbus)
=> Result display 'Superbus' and we don't understand why
2021-08-23 01:22:58 +02:00

354 lines
12 KiB
Python

#!/usr/bin/env python
"""
Parse iTunes library and produce JSON adapted files to send to Elasticsearch
Rating note:
For albums and artists data, 'Rating' is the average rate for *all* songs in the album or of the artist.
So, if in an album, 10 songs are evaluated and 2 not evaluated, 'Rating' will be the sum of rate divided by 12.
TODO: Add informations to store number of evaluated songs, and 'Rating' for evaluated song.
Parses an iTunes library XML file and generates a JSON file
for use in the D3.js JavaScript library.
Example Track info:
{
'Album': 'Nirvana',
'Persistent ID': 'A50FE1436726815C',
'Track Number': 4,
'Location': 'file://localhost/Users/foo/Music/iTunes/iTunes%20Music/Nirvana/Nirvana/04%20Sliver.mp3',
'File Folder Count': 4,
'Album Rating Computed': True,
'Total Time': 134295,
'Sample Rate': 44100,
'Genre': 'Rock/Alternative',
'Bit Rate': 236,
'Kind': 'MPEG audio file',
'Name': 'Sliver',
'Artist': 'Nirvana',
'Date Added': datetime.datetime(2006, 10, 11, 4, 31, 38),
'Album Rating': 60,
'Rating': 40,
'Date Modified': datetime.datetime(2009, 7, 18, 4, 57, 41),
'Library Folder Count': 1,
'Year': 2002,
'Track ID': 7459,
'Size': 3972838,
'Track Type': 'File',
'Play Count': 2,
'Play Date UTC': datetime.datetime(2009, 7, 18, 5, 00, 00)
}
"""
import datetime
import io
import json
import os
import plistlib
import sys
import argparse
import hashlib
class SetEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
if isinstance(obj, datetime.datetime):
return obj.isoformat()
# encoded_object = int(mktime(obj.timetuple()))
return json.JSONEncoder.default(self, obj)
class ITunesParser:
"""
Parse an iTunes Library and produce JSON - for ELS
"""
SONG_INDEX = 'itunes-songs'
ALBUM_INDEX = 'itunes-albums'
ARTIST_INDEX = 'itunes-artists'
# TODO Put variables in a config files or in a python library
def __init__(self):
self._tracks = {}
self._albums = {}
self._artists = {}
def _read_tracks(self, library_file):
"""
Read library file and return Tracks key of dict.
Dict may contains
- Major Version
- Minor Version
- Date
- Application Version
- Features
- Show Content Ratings
- Music Folder
- Library Persistent ID
- Tracks
- ...
"""
plist = plistlib.load(open(library_file, 'rb'))
return plist['Tracks']
def parse(self, library_file):
"""
Return an output JSON for an ELS Bulk request - Not a correct format
This method call process_album & process_artist
TODO Just return a _correct_ JSON and treat in another place/class
"""
tracks = self._read_tracks(library_file)
for _, track in tracks.items():
# Filter out any non-music
if track['Track Type'] != 'File':
continue
if 'Podcast' in track or 'Has Video' in track:
continue
# Each keeped track are stored
self._tracks[track['Persistent ID']] = track
# Retrieve for each track artist information
self._process_artist(track)
# Retrieve for each track album information
self._process_album(track)
ret = {
'songs': self._tracks,
'albums': self._albums,
'artists': self._artists
}
return ret
def _process_artist(self, track):
"""
Process artists in the track part of library and return a JSON formated for a bulk ELS request
"""
if 'Album Artist' not in track and 'Artist' not in track:
return
akey = track['Album Artist'] if 'Album Artist' in track else track['Artist']
# Add artist
if akey not in self._artists:
a_id = self.calc_id(akey)
# Key is used to increment/precise some information
# So we use artist name as a key to avoid calculating an ID for each track
self._artists[akey] = {
'Persistent ID': a_id,
'Name': akey,
'Artist': akey,
'Track Count': 0,
'Play Count': 0,
'Rating': 0,
'Genre': set(),
'Album': set(),
'Album Artist': set()
}
# Compute information
play_count = track['Play Count'] if 'Play Count' in track else 0
rating = track['Rating'] if 'Rating' in track else 0
rating = self.calc_average(rating, self._artists[akey]['Rating'], self._artists[akey]['Track Count'])
self._artists[akey]['Track Count'] += 1
self._artists[akey]['Rating'] = rating
self._artists[akey]['Play Count'] += play_count
if 'Genre' in track:
# Split up the Genres
genre_parts = track['Genre'].split('/')
self._artists[akey]['Genre'] |= set(genre_parts)
if 'Album' in track:
self._artists[akey]['Album'].add(track['Album'])
if 'Album Artist' in track:
self._artists[akey]['Album Artist'].add(track['Artist'])
def _process_album(self, track):
"""
Process albums in the track part of library and return a JSON formated for a bulk ELS request
"""
if 'Album' not in track:
return
akey = track['Album']
if akey not in self._albums:
a_id = self.calc_id(akey)
# Key is used to increment/precise some information
# So we use album name as a key to avoid calculating an ID for each track
self._albums[akey] = {
'Persistent ID': a_id,
'Name': akey,
'Album': akey,
'Track Count': 0,
'Play Count': 0,
'Genre': set(),
'Artist': set(),
'Avg Bit Rate': track['Bit Rate'],
'Min Bit Rate': track['Bit Rate'],
# 'Album Artist': '',
'Total Time': 0
}
# Compute information
play_count = track['Play Count'] if 'Play Count' in track else 0
total_time = track['Total Time'] if 'Total Time' in track else 0
avg_bitrate = self.calc_average(track['Bit Rate'], self._albums[akey]['Avg Bit Rate'], self._albums[akey]['Track Count'])
self._albums[akey]['Avg Bit Rate'] = avg_bitrate
self._albums[akey]['Track Count'] += 1
self._albums[akey]['Play Count'] += play_count
self._albums[akey]['Total Time'] += total_time
if self._albums[akey]['Min Bit Rate'] > track['Bit Rate']:
self._albums[akey]['Min Bit Rate'] = track['Bit Rate']
if 'Genre' in track:
# Split up the Genres
genre_parts = track['Genre'].split('/')
self._albums[akey]['Genre'] |= set(genre_parts)
if 'Artist' in track:
self._albums[akey]['Artist'].add(track['Artist'])
if 'Album Rating' in track:
self._albums[akey]['Album Rating'] = track['Album Rating']
if 'Album Rating Computed' in track:
self._albums[akey]['Album Rating Computed'] = track['Album Rating Computed']
if 'Album Artist' in track:
self._albums[akey]['Album Artist'] = track['Album Artist']
@classmethod
def calc_average(cls, added_value, current_value, nb_values):
"""
Calculate average value from a current value, a value to add and the number of values
"""
return (current_value * nb_values + added_value) / (nb_values + 1)
@classmethod
def calc_id(cls, key):
"""
Calculate a MD5 sum from a key as ID
"""
md5 = hashlib.md5()
md5.update(key.encode('UTF-8'))
return md5.hexdigest()
class WriteElsJson:
@staticmethod
def write_artists(artists, output_file):
"""
Write artists data to another JSON file
"""
file_artist = io.open(output_file, 'wb')
for _, artist in artists.items():
persistent_id = artist['Persistent ID']
artist['Rating'] = round(artist['Rating'])
json_track_index = {
"index": {"_index": ITunesParser.ARTIST_INDEX, "_id": persistent_id}
}
file_artist.write(bytes(json.dumps(json_track_index, indent=None, cls=SetEncoder), 'UTF-8'))
file_artist.write(bytes("\n", 'UTF-8'))
file_artist.write(bytes(json.dumps(artist, indent=None, cls=SetEncoder), 'UTF-8'))
file_artist.write(bytes("\n", 'UTF-8'))
file_artist.close()
@staticmethod
def write_albums(albums, output_file):
"""
Write albums data to another JSON file
"""
file_albums = io.open(output_file, 'wb')
for _, album in albums.items():
persistent_id = album['Persistent ID']
album['Avg Bit Rate'] = round(album['Avg Bit Rate'])
json_track_index = {
"index": {"_index": ITunesParser.ALBUM_INDEX, "_id": persistent_id}
}
file_albums.write(bytes(json.dumps(json_track_index, indent=None, cls=SetEncoder), 'UTF-8'))
file_albums.write(bytes("\n", 'UTF-8'))
file_albums.write(bytes(json.dumps(album, indent=None, cls=SetEncoder), 'UTF-8'))
file_albums.write(bytes("\n", 'UTF-8'))
file_albums.close()
@staticmethod
def write_songs(songs, output_file):
"""
Write songs to a JSON
"""
file = io.open(output_file, 'wb')
for persistent_id, song in songs.items():
json_track_index = {
"index": {"_index": ITunesParser.SONG_INDEX, "_id": persistent_id}
}
file.write(bytes(json.dumps(json_track_index, indent=None, cls=SetEncoder), 'UTF-8'))
file.write(bytes("\n", 'UTF-8'))
file.write(bytes(json.dumps(song, indent=None, cls=SetEncoder), 'UTF-8'))
file.write(bytes("\n", 'UTF-8'))
file.close()
#### main block ####
# Default input & output files
DEFAULT_LIBRARY_FILE_NAME = 'iTunesLibrary.xml'
DEFAULT_OUTPUT_FILE_NAME = '/es-music-data.json'
DEFAULT_LIBRARY_FILE = os.path.expanduser(DEFAULT_LIBRARY_FILE_NAME)
DEFAULT_OUTPUT_FILE = os.path.dirname(os.path.realpath(__file__)) + DEFAULT_OUTPUT_FILE_NAME
# Get options
parser = argparse.ArgumentParser(description="""
Parse an iTunes XML library file to produce JSON file for ELS bulk operation.
""")
parser.add_argument('-f', '--file', default=DEFAULT_LIBRARY_FILE,
help='iTunes Library XML file path (default: ./' + DEFAULT_LIBRARY_FILE_NAME + ')')
parser.add_argument('-o', '--output', default=DEFAULT_OUTPUT_FILE,
help='Output to file (default: .' + DEFAULT_OUTPUT_FILE_NAME + ')')
parser.add_argument('-c', '--console', action='store_true',
help='Output to console instead of file')
# parser.add_argument('-v', '--verbose', action='store_true',
# help='Verbose output')
if __name__ == '__main__':
args = parser.parse_args()
print("Parsing file '{}'...".format(args.file))
itunes_parser = ITunesParser().parse(args.file)
print("Writing JSON files...")
WriteElsJson.write_songs(itunes_parser['songs'], "es-songs.json")
WriteElsJson.write_artists(itunes_parser['artists'], "es-artists.json")
WriteElsJson.write_albums(itunes_parser['albums'], "es-albums.json")
print('Done!')
# if args.console:
# print(output)
# else:
# with io.open(args.output, 'wb') as outfile:
# if sys.version_info.major == 2:
# outfile.write(bytes(output))
# elif sys.version_info.major == 3:
# outfile.write(bytes(output, 'UTF-8'))
# print('JSON data written to: ' + args.output)