Files
iTunes/iTunesParser.py

370 lines
12 KiB
Python

#!/usr/bin/env python
"""
Parse iTunes library and produce JSON adapted files to send to Elasticsearch
Rating note:
For albums and artists data, 'Rating' is the average rate for *all* songs in the album or of the artist.
So, if in an album, 10 songs are evaluated and 2 not evaluated, 'Rating' will be the sum of rate divided by 12.
TODO: Add informations to store number of evaluated songs, and 'Rating' for evaluated song.
Parses an iTunes library XML file and generates a JSON file
for use in the D3.js JavaScript library.
Example Track info:
{
'Album': 'Nirvana',
'Persistent ID': 'A50FE1436726815C',
'Track Number': 4,
'Location': 'file://localhost/Users/foo/Music/iTunes/iTunes%20Music/Nirvana/Nirvana/04%20Sliver.mp3',
'File Folder Count': 4,
'Album Rating Computed': True,
'Total Time': 134295,
'Sample Rate': 44100,
'Genre': 'Rock/Alternative',
'Bit Rate': 236,
'Kind': 'MPEG audio file',
'Name': 'Sliver',
'Artist': 'Nirvana',
'Date Added': datetime.datetime(2006, 10, 11, 4, 31, 38),
'Album Rating': 60,
'Rating': 40,
'Date Modified': datetime.datetime(2009, 7, 18, 4, 57, 41),
'Library Folder Count': 1,
'Year': 2002,
'Track ID': 7459,
'Size': 3972838,
'Track Type': 'File',
'Play Count': 2,
'Play Date UTC': datetime.datetime(2009, 7, 18, 5, 00, 00)
}
"""
import argparse
import datetime
import hashlib
import io
import json
import os
import plistlib
class JsonCustomEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
if isinstance(obj, datetime.datetime):
return obj.isoformat()
# encoded_object = int(mktime(obj.timetuple()))
return json.JSONEncoder.default(self, obj)
class ITunesParser:
"""
Parse an iTunes Library and produce JSON - for ELS
"""
def __init__(self):
self._tracks = {}
self._albums = {}
self._artists = {}
def _read_tracks(self, library_file):
"""
Read library file and return Tracks key of dict.
Dict may contains
- Major Version
- Minor Version
- Date
- Application Version
- Features
- Show Content Ratings
- Music Folder
- Library Persistent ID
- Tracks
- ...
"""
plist = plistlib.load(open(library_file, "rb"))
return plist["Tracks"]
def parse(self, library_file) -> dict:
"""
Return an output JSON for an ELS Bulk request - Not a correct format
This method call process_album & process_artist
TODO Just return a _correct_ JSON and treat in another place/class
"""
tracks = self._read_tracks(library_file)
for _, track in tracks.items():
# Filter out any non-music
if track["Track Type"] != "File":
continue
if "Podcast" in track or "Has Video" in track:
continue
# Each keeped track are stored
self._tracks[track["Persistent ID"]] = track
# Retrieve for each track artist information
self._process_artist(track)
# Retrieve for each track album information
self._process_album(track)
ret = {"songs": self._tracks, "albums": self._albums, "artists": self._artists}
return ret
def _process_artist(self, track):
"""
Process artists in the track part of library and return a JSON formated for a bulk ELS request
"""
if "Album Artist" not in track and "Artist" not in track:
return
akey = track["Album Artist"] if "Album Artist" in track else track["Artist"]
persistent_id = self.calc_id(akey)
if persistent_id not in self._artists:
# Key is used to increment/precise some information
# So we use artist name as a key to avoid calculating an ID for each track
self._artists[persistent_id] = {
"Persistent ID": persistent_id,
"Name": akey,
"Artist": akey,
"Track Count": 0,
"Play Count": 0,
"Rating": 0,
"Genre": set(),
"Album": set(),
}
# Compute information
play_count = track["Play Count"] if "Play Count" in track else 0
rating = track["Rating"] if "Rating" in track else 0
rating = self.calc_average(
rating,
self._artists[persistent_id]["Rating"],
self._artists[persistent_id]["Track Count"],
)
self._artists[persistent_id]["Track Count"] += 1
self._artists[persistent_id]["Rating"] = rating
self._artists[persistent_id]["Play Count"] += play_count
if "Genre" in track:
# Split up the Genres
genre_parts = track["Genre"].split("/")
self._artists[persistent_id]["Genre"] |= set(genre_parts)
if "Album" in track:
self._artists[persistent_id]["Album"].add(track["Album"])
def _process_album(self, track):
"""
Process albums in the track part of library and return a JSON formated for a bulk ELS request
"""
if "Album" not in track:
return
akey = track["Album"]
persistent_id = self.calc_id(akey)
if persistent_id not in self._albums:
# Key is used to increment/precise some information
# So we use album name as a key to avoid calculating an ID for each track
self._albums[persistent_id] = {
"Persistent ID": persistent_id,
"Name": akey,
"Album": akey,
"Track Count": 0,
"Play Count": 0,
"Genre": set(),
"Artist": set(),
"Avg Bit Rate": track["Bit Rate"],
"Min Bit Rate": track["Bit Rate"],
# 'Album Artist': '',
"Total Time": 0,
"Location": "",
"Date Added": track["Date Added"],
}
# Compute information
play_count = track["Play Count"] if "Play Count" in track else 0
total_time = track["Total Time"] if "Total Time" in track else 0
avg_bitrate = self.calc_average(
track["Bit Rate"],
self._albums[persistent_id]["Avg Bit Rate"],
self._albums[persistent_id]["Track Count"],
)
self._albums[persistent_id]["Avg Bit Rate"] = avg_bitrate
self._albums[persistent_id]["Track Count"] += 1
self._albums[persistent_id]["Play Count"] += play_count
self._albums[persistent_id]["Total Time"] += total_time
self._albums[persistent_id]["Location"] = os.path.dirname(track["Location"])
if self._albums[persistent_id]["Min Bit Rate"] > track["Bit Rate"]:
self._albums[persistent_id]["Min Bit Rate"] = track["Bit Rate"]
if "Genre" in track:
# Split up the Genres
genre_parts = track["Genre"].split("/")
self._albums[persistent_id]["Genre"] |= set(genre_parts)
if "Artist" in track:
self._albums[persistent_id]["Artist"].add(track["Artist"])
if "Album Rating" in track:
self._albums[persistent_id]["Album Rating"] = track["Album Rating"]
if "Album Rating Computed" in track:
self._albums[persistent_id]["Album Rating Computed"] = track[
"Album Rating Computed"
]
if "Album Artist" in track:
self._albums[persistent_id]["Album Artist"] = track["Album Artist"]
if "Date Added" in track:
if track["Date Added"] < self._albums[persistent_id]["Date Added"]:
print(f"Lower date for {akey} - {track['Album']}")
self._albums[persistent_id]["Date Added"] = track["Date Added"]
@classmethod
def calc_average(cls, added_value, current_value, nb_values):
"""
Calculate average value from a current value, a value to add and the number of values
"""
return (current_value * nb_values + added_value) / (nb_values + 1)
@classmethod
def calc_id(cls, key):
"""
Calculate a MD5 sum from a key as ID
"""
md5 = hashlib.md5()
md5.update(key.encode("UTF-8"))
return md5.hexdigest()
class WriteElsJson:
@staticmethod
def write_elements(
elements: list,
element_type: str,
json_style: str,
):
"""
Write songs to a JSON
"""
output_filename = f"es-{element_type}.{json_style}"
if json_style == "els":
output_filename += ".json"
json_dump_option = {"ensure_ascii": False, "cls": JsonCustomEncoder}
with open(output_filename, "w", encoding="utf-8") as ofile:
match json_style:
case "json":
json_str = json.dumps(elements, **json_dump_option)
ofile.write(
json_str.replace("}, {", "},\n{")
) # One line = one record
case "jsonl":
for el in elements:
json.dump(el, ofile, **json_dump_option)
ofile.write("\n")
case "els":
for el in elements:
json_track_index = {
"index": {
"_index": f"itunes-{element_type}",
"_id": el["Persistent ID"],
}
}
json.dump(json_track_index, ofile, **json_dump_option)
ofile.write("\n")
json.dump(el, ofile, **json_dump_option)
ofile.write("\n")
case _:
print("ERROR: no write format")
#### main block ####
# Default input & output files
DEFAULT_LIBRARY_FILE_NAME = "iTunesLibrary.xml"
DEFAULT_OUTPUT_FILE_NAME = "/es-music-data.json"
DEFAULT_LIBRARY_FILE = os.path.expanduser(DEFAULT_LIBRARY_FILE_NAME)
DEFAULT_OUTPUT_FILE = (
os.path.dirname(os.path.realpath(__file__)) + DEFAULT_OUTPUT_FILE_NAME
)
# Get options
parser = argparse.ArgumentParser(
description="""
Parse an iTunes XML library file to produce JSON file for ELS bulk operation.
"""
)
parser.add_argument(
"-f",
"--file",
default=DEFAULT_LIBRARY_FILE,
help="iTunes Library XML file path (default: ./" + DEFAULT_LIBRARY_FILE_NAME + ")",
)
parser.add_argument(
"-o",
"--output",
default=DEFAULT_OUTPUT_FILE,
help="Output to file (default: ." + DEFAULT_OUTPUT_FILE_NAME + ")",
)
parser.add_argument(
"-c", "--console", action="store_true", help="Output to console instead of file"
)
parser.add_argument(
"-F",
"--format",
choices=["json", "jsonl", "els"],
default="json",
help="Choose JSON style",
)
# parser.add_argument('-v', '--verbose', action='store_true',
# help='Verbose output')
if __name__ == "__main__":
args = parser.parse_args()
print(f"Parsing file '{args.file}'...")
itunes_parser = ITunesParser().parse(args.file)
print("Writing JSON files...")
WriteElsJson.write_elements(
[x for _, x in itunes_parser["songs"].items()], "songs", args.format
)
WriteElsJson.write_elements(
[x for _, x in itunes_parser["artists"].items()], "artists", args.format
)
WriteElsJson.write_elements(
[x for _, x in itunes_parser["albums"].items()], "albums", args.format
)
print("Done!")
# if args.console:
# print(output)
# else:
# with io.open(args.output, 'wb') as outfile:
# if sys.version_info.major == 2:
# outfile.write(bytes(output))
# elif sys.version_info.major == 3:
# outfile.write(bytes(output, 'UTF-8'))
# print('JSON data written to: ' + args.output)