Files
iTunes/iTunesParser.py

268 lines
8.5 KiB
Python

#!/usr/bin/env python
"""
--> Parse library and just do JSON adapted for Elasticsearch
iTunes Graph Parser
Parses an iTunes library XML file and generates a JSON file
for use in the D3.js JavaScript library.
Example Track info:
{
'Album': 'Nirvana',
'Persistent ID': 'A50FE1436726815C',
'Track Number': 4,
'Location': 'file://localhost/Users/foo/Music/iTunes/iTunes%20Music/Nirvana/Nirvana/04%20Sliver.mp3',
'File Folder Count': 4,
'Album Rating Computed': True,
'Total Time': 134295,
'Sample Rate': 44100,
'Genre': 'Rock/Alternative',
'Bit Rate': 236,
'Kind': 'MPEG audio file',
'Name': 'Sliver',
'Artist': 'Nirvana',
'Date Added': datetime.datetime(2006, 10, 11, 4, 31, 38),
'Album Rating': 60,
'Rating': 40,
'Date Modified': datetime.datetime(2009, 7, 18, 4, 57, 41),
'Library Folder Count': 1,
'Year': 2002,
'Track ID': 7459,
'Size': 3972838,
'Track Type': 'File',
'Play Count': 2,
'Play Date UTC': datetime.datetime(2009, 7, 18, 5, 00, 00)
}
"""
import datetime
import io
import json
import os
import plistlib
import sys
import argparse
class SetEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
if isinstance(obj, datetime.datetime):
return obj.isoformat()
# encoded_object = int(mktime(obj.timetuple()))
return json.JSONEncoder.default(self, obj)
class ITunesParser:
"""
Parse an iTunes Library and produce JSON - for ELS
"""
def __init__(self, library_file):
self._albums = {}
self._artists = {}
self.library_file = library_file
def to_json(self):
"""
Just do processSong()
or do process_songs, then _write_artists and _write_albums.
Note: process_songs do a process_artists and process_albums...
This method suck.
"""
ret = self._process_songs()
self._write_artists()
self._write_albums()
# return json.dumps(jsonObj, indent=indent, cls=SetEncoder)
return ret
def _read_tracks(self):
"""
Read library and return Tracks part
"""
plist = plistlib.readPlist(self.library_file)
return plist['Tracks']
def _process_songs(self):
"""
Return an output JSON for an ELS Bulk request - Not a correct format
This method call process_album & process_artist
TODO Just return a _correct_ JSON and treat in another place/class
"""
tracks = self._read_tracks()
ret = ""
for k in tracks:
track = tracks[k]
# Filter out any non-music
if track['Track Type'] != 'File':
continue
if 'Podcast' in track or 'Has Video' in track:
continue
persistent_id = track['Persistent ID']
json_track_index = {
"index": {"_index": "itunessongs", "_type": "song", "_id": persistent_id}
}
# Retrieve for each track artist information
self._process_artist(track)
# Retrieve for each track album information
self._process_album(track)
ret += json.dumps(json_track_index, indent=None, cls=SetEncoder)
ret += "\n"
ret += json.dumps(track, indent=None, cls=SetEncoder)
ret += "\n"
return ret
def _process_artist(self, track):
"""
Process artists in the track part of library and return a JSON formated for a bulk ELS request
"""
if 'Artist' not in track:
return
akey = track['Artist']
# Add artist
if akey not in self._artists:
self._artists[akey] = {
'id': len(self._artists),
'name': akey,
'count': 0, 'plays': 0, 'rating': 0,
'genres': set()
}
# Compute information
rating = (track['Rating'] // 20) if 'Rating' in track else 0
# TODO Improve rating that currently can go ahead to 100
plays = track['Play Count'] if 'Play Count' in track else 0
self._artists[akey]['count'] += 1
self._artists[akey]['rating'] += rating
self._artists[akey]['plays'] += plays
if 'Genre' not in track:
return
# Split up the Genres
genre_parts = track['Genre'].split('/')
self._artists[akey]['genres'] |= set(genre_parts)
return
def _process_album(self, track):
"""
Process albums in the track part of library and return a JSON formated for a bulk ELS request
"""
if 'Album' not in track:
return
akey = track['Album']
if akey not in self._albums:
self._albums[akey] = {
'id': len(self._albums),
'name': akey,
'count': 0, 'plays': 0, 'rating': 0,
'genres': set(),
'artist': set()
}
# Compute information
rating = (track['Rating'] // 20) if 'Rating' in track else 0
plays = track['Play Count'] if 'Play Count' in track else 0
self._albums[akey]['count'] += 1
self._albums[akey]['rating'] += rating
# TODO Improve rating that currently can go ahead to 100
self._albums[akey]['plays'] += plays
if 'Genre' not in track:
return
# Split up the Genres
genre_parts = track['Genre'].split('/')
self._albums[akey]['genres'] |= set(genre_parts)
## Add different artists
if 'Artist' not in track:
return
self._albums[akey]['artist'].add(track['Artist'])
return
def _write_artists(self):
"""
Write artists data to another JSON file
"""
file_artist = io.open('es-artist-data.json', 'wb')
for artist in self._artists:
json_track_index = {
"index": {"_index": "itunessongs", "_type": "artist"}
}
file_artist.write(bytes(json.dumps(json_track_index, indent=None, cls=SetEncoder), 'UTF-8'))
file_artist.write(bytes("\n", 'UTF-8'))
file_artist.write(bytes(json.dumps(self._artists[artist], indent=None, cls=SetEncoder), 'UTF-8'))
file_artist.write(bytes("\n", 'UTF-8'))
file_artist.close()
def _write_albums(self):
"""
Write albums data to another JSON file
"""
file_albums = io.open('es-albums-data.json', 'wb')
for album in self._albums:
json_track_index = {
"index": {"_index": "itunessongs", "_type": "album"}
}
file_albums.write(bytes(json.dumps(json_track_index, indent=None, cls=SetEncoder), 'UTF-8'))
file_albums.write(bytes("\n", 'UTF-8'))
file_albums.write(bytes(json.dumps(self._albums[album], indent=None, cls=SetEncoder), 'UTF-8'))
file_albums.write(bytes("\n", 'UTF-8'))
file_albums.close()
#### main block ####
# Default input & output files
DEFAULT_LIBRARY_FILE_NAME = 'iTunesLibrary.xml'
DEFAULT_OUTPUT_FILE_NAME = '/es-music-data.json'
DEFAULT_LIBRARY_FILE = os.path.expanduser(DEFAULT_LIBRARY_FILE_NAME)
DEFAULT_OUTPUT_FILE = os.path.dirname(os.path.realpath(__file__)) + DEFAULT_OUTPUT_FILE_NAME
# Get options
parser = argparse.ArgumentParser()
parser.add_argument('-f', '--file', default=DEFAULT_LIBRARY_FILE,
help='iTunes Library XML file path (default: ./' + DEFAULT_LIBRARY_FILE_NAME + ')')
parser.add_argument('-o', '--output', default=DEFAULT_OUTPUT_FILE,
help='Output to file (default: .' + DEFAULT_OUTPUT_FILE_NAME + ')')
parser.add_argument('-c', '--console', action='store_true',
help='Output to console instead of file')
# parser.add_argument('-v', '--verbose', action='store_true',
# help='Verbose output')
if __name__ == '__main__':
args = parser.parse_args()
itunes_parser = ITunesParser(args.file)
output = itunes_parser.to_json()
if args.console:
print(output)
else:
with io.open(args.output, 'wb') as outfile:
if sys.version_info.major == 2:
outfile.write(bytes(output))
elif sys.version_info.major == 3:
outfile.write(bytes(output, 'UTF-8'))
print('JSON data written to: ' + args.output)