This commit is contained in:
2025-10-22 10:09:41 +02:00
parent 93b3632bb4
commit ce680beeb3

View File

@@ -42,14 +42,13 @@ for use in the D3.js JavaScript library.
""" """
import argparse
import datetime import datetime
import hashlib
import io import io
import json import json
import os import os
import plistlib import plistlib
import sys
import argparse
import hashlib
class SetEncoder(json.JSONEncoder): class SetEncoder(json.JSONEncoder):
@@ -64,11 +63,12 @@ class SetEncoder(json.JSONEncoder):
class ITunesParser: class ITunesParser:
""" """
Parse an iTunes Library and produce JSON - for ELS Parse an iTunes Library and produce JSON - for ELS
""" """
SONG_INDEX = 'itunes-songs'
ALBUM_INDEX = 'itunes-albums' SONG_INDEX = "itunes-songs"
ARTIST_INDEX = 'itunes-artists' ALBUM_INDEX = "itunes-albums"
ARTIST_INDEX = "itunes-artists"
# TODO Put variables in a config files or in a python library # TODO Put variables in a config files or in a python library
def __init__(self): def __init__(self):
@@ -78,172 +78,174 @@ class ITunesParser:
def _read_tracks(self, library_file): def _read_tracks(self, library_file):
""" """
Read library file and return Tracks key of dict. Read library file and return Tracks key of dict.
Dict may contains Dict may contains
- Major Version - Major Version
- Minor Version - Minor Version
- Date - Date
- Application Version - Application Version
- Features - Features
- Show Content Ratings - Show Content Ratings
- Music Folder - Music Folder
- Library Persistent ID - Library Persistent ID
- Tracks - Tracks
- ... - ...
""" """
plist = plistlib.load(open(library_file, 'rb')) plist = plistlib.load(open(library_file, "rb"))
return plist['Tracks'] return plist["Tracks"]
def parse(self, library_file): def parse(self, library_file):
""" """
Return an output JSON for an ELS Bulk request - Not a correct format Return an output JSON for an ELS Bulk request - Not a correct format
This method call process_album & process_artist This method call process_album & process_artist
TODO Just return a _correct_ JSON and treat in another place/class TODO Just return a _correct_ JSON and treat in another place/class
""" """
tracks = self._read_tracks(library_file) tracks = self._read_tracks(library_file)
for _, track in tracks.items(): for _, track in tracks.items():
# Filter out any non-music # Filter out any non-music
if track['Track Type'] != 'File': if track["Track Type"] != "File":
continue continue
if 'Podcast' in track or 'Has Video' in track: if "Podcast" in track or "Has Video" in track:
continue continue
# Each keeped track are stored # Each keeped track are stored
self._tracks[track['Persistent ID']] = track self._tracks[track["Persistent ID"]] = track
# Retrieve for each track artist information # Retrieve for each track artist information
self._process_artist(track) self._process_artist(track)
# Retrieve for each track album information # Retrieve for each track album information
self._process_album(track) self._process_album(track)
ret = { ret = {"songs": self._tracks, "albums": self._albums, "artists": self._artists}
'songs': self._tracks,
'albums': self._albums,
'artists': self._artists
}
return ret return ret
def _process_artist(self, track): def _process_artist(self, track):
""" """
Process artists in the track part of library and return a JSON formated for a bulk ELS request Process artists in the track part of library and return a JSON formated for a bulk ELS request
""" """
if 'Album Artist' not in track and 'Artist' not in track: if "Album Artist" not in track and "Artist" not in track:
return return
akey = track['Album Artist'] if 'Album Artist' in track else track['Artist'] akey = track["Album Artist"] if "Album Artist" in track else track["Artist"]
# Add artist # Add artist
if akey not in self._artists: if akey not in self._artists:
a_id = self.calc_id(akey) a_id = self.calc_id(akey)
# Key is used to increment/precise some information # Key is used to increment/precise some information
# So we use artist name as a key to avoid calculating an ID for each track # So we use artist name as a key to avoid calculating an ID for each track
self._artists[akey] = { self._artists[akey] = {
'Persistent ID': a_id, "Persistent ID": a_id,
'Name': akey, "Name": akey,
'Artist': akey, "Artist": akey,
'Track Count': 0, "Track Count": 0,
'Play Count': 0, "Play Count": 0,
'Rating': 0, "Rating": 0,
'Genre': set(), "Genre": set(),
'Album': set() "Album": set(),
} }
# Compute information # Compute information
play_count = track['Play Count'] if 'Play Count' in track else 0 play_count = track["Play Count"] if "Play Count" in track else 0
rating = track['Rating'] if 'Rating' in track else 0 rating = track["Rating"] if "Rating" in track else 0
rating = self.calc_average(rating, self._artists[akey]['Rating'], self._artists[akey]['Track Count']) rating = self.calc_average(
rating, self._artists[akey]["Rating"], self._artists[akey]["Track Count"]
)
self._artists[akey]['Track Count'] += 1 self._artists[akey]["Track Count"] += 1
self._artists[akey]['Rating'] = rating self._artists[akey]["Rating"] = rating
self._artists[akey]['Play Count'] += play_count self._artists[akey]["Play Count"] += play_count
if 'Genre' in track: if "Genre" in track:
# Split up the Genres # Split up the Genres
genre_parts = track['Genre'].split('/') genre_parts = track["Genre"].split("/")
self._artists[akey]['Genre'] |= set(genre_parts) self._artists[akey]["Genre"] |= set(genre_parts)
if 'Album' in track: if "Album" in track:
self._artists[akey]['Album'].add(track['Album']) self._artists[akey]["Album"].add(track["Album"])
def _process_album(self, track): def _process_album(self, track):
""" """
Process albums in the track part of library and return a JSON formated for a bulk ELS request Process albums in the track part of library and return a JSON formated for a bulk ELS request
""" """
if 'Album' not in track: if "Album" not in track:
return return
akey = track['Album'] akey = track["Album"]
if akey not in self._albums: if akey not in self._albums:
a_id = self.calc_id(akey) a_id = self.calc_id(akey)
# Key is used to increment/precise some information # Key is used to increment/precise some information
# So we use album name as a key to avoid calculating an ID for each track # So we use album name as a key to avoid calculating an ID for each track
self._albums[akey] = { self._albums[akey] = {
'Persistent ID': a_id, "Persistent ID": a_id,
'Name': akey, "Name": akey,
'Album': akey, "Album": akey,
'Track Count': 0, "Track Count": 0,
'Play Count': 0, "Play Count": 0,
'Genre': set(), "Genre": set(),
'Artist': set(), "Artist": set(),
'Avg Bit Rate': track['Bit Rate'], "Avg Bit Rate": track["Bit Rate"],
'Min Bit Rate': track['Bit Rate'], "Min Bit Rate": track["Bit Rate"],
# 'Album Artist': '', # 'Album Artist': '',
'Total Time': 0, "Total Time": 0,
'Location': '' "Location": "",
} }
# Compute information # Compute information
play_count = track['Play Count'] if 'Play Count' in track else 0 play_count = track["Play Count"] if "Play Count" in track else 0
total_time = track['Total Time'] if 'Total Time' in track else 0 total_time = track["Total Time"] if "Total Time" in track else 0
avg_bitrate = self.calc_average(track['Bit Rate'], self._albums[akey]['Avg Bit Rate'], self._albums[akey]['Track Count']) avg_bitrate = self.calc_average(
track["Bit Rate"],
self._albums[akey]["Avg Bit Rate"],
self._albums[akey]["Track Count"],
)
self._albums[akey]['Avg Bit Rate'] = avg_bitrate self._albums[akey]["Avg Bit Rate"] = avg_bitrate
self._albums[akey]['Track Count'] += 1 self._albums[akey]["Track Count"] += 1
self._albums[akey]['Play Count'] += play_count self._albums[akey]["Play Count"] += play_count
self._albums[akey]['Total Time'] += total_time self._albums[akey]["Total Time"] += total_time
self._albums[akey]['Location'] = os.path.dirname(track['Location']) self._albums[akey]["Location"] = os.path.dirname(track["Location"])
if self._albums[akey]['Min Bit Rate'] > track['Bit Rate']: if self._albums[akey]["Min Bit Rate"] > track["Bit Rate"]:
self._albums[akey]['Min Bit Rate'] = track['Bit Rate'] self._albums[akey]["Min Bit Rate"] = track["Bit Rate"]
if 'Genre' in track: if "Genre" in track:
# Split up the Genres # Split up the Genres
genre_parts = track['Genre'].split('/') genre_parts = track["Genre"].split("/")
self._albums[akey]['Genre'] |= set(genre_parts) self._albums[akey]["Genre"] |= set(genre_parts)
if 'Artist' in track: if "Artist" in track:
self._albums[akey]['Artist'].add(track['Artist']) self._albums[akey]["Artist"].add(track["Artist"])
if 'Album Rating' in track: if "Album Rating" in track:
self._albums[akey]['Album Rating'] = track['Album Rating'] self._albums[akey]["Album Rating"] = track["Album Rating"]
if 'Album Rating Computed' in track: if "Album Rating Computed" in track:
self._albums[akey]['Album Rating Computed'] = track['Album Rating Computed'] self._albums[akey]["Album Rating Computed"] = track["Album Rating Computed"]
if 'Album Artist' in track: if "Album Artist" in track:
self._albums[akey]['Album Artist'] = track['Album Artist'] self._albums[akey]["Album Artist"] = track["Album Artist"]
@classmethod @classmethod
def calc_average(cls, added_value, current_value, nb_values): def calc_average(cls, added_value, current_value, nb_values):
""" """
Calculate average value from a current value, a value to add and the number of values Calculate average value from a current value, a value to add and the number of values
""" """
return (current_value * nb_values + added_value) / (nb_values + 1) return (current_value * nb_values + added_value) / (nb_values + 1)
@classmethod @classmethod
def calc_id(cls, key): def calc_id(cls, key):
""" """
Calculate a MD5 sum from a key as ID Calculate a MD5 sum from a key as ID
""" """
md5 = hashlib.md5() md5 = hashlib.md5()
md5.update(key.encode('UTF-8')) md5.update(key.encode("UTF-8"))
return md5.hexdigest() return md5.hexdigest()
@@ -251,13 +253,13 @@ class WriteElsJson:
@staticmethod @staticmethod
def write_artists(artists, output_file): def write_artists(artists, output_file):
""" """
Write artists data to another JSON file Write artists data to another JSON file
""" """
file_artist = io.open(output_file, 'wb') file_artist = io.open(output_file, "wb")
for _, artist in artists.items(): for _, artist in artists.items():
persistent_id = artist['Persistent ID'] persistent_id = artist["Persistent ID"]
artist['Rating'] = round(artist['Rating']) artist["Rating"] = round(artist["Rating"])
json_track_index = { json_track_index = {
"index": {"_index": ITunesParser.ARTIST_INDEX, "_id": persistent_id} "index": {"_index": ITunesParser.ARTIST_INDEX, "_id": persistent_id}
@@ -265,20 +267,22 @@ class WriteElsJson:
# file_artist.write(bytes(json.dumps(json_track_index, indent=None, cls=SetEncoder), 'UTF-8')) # file_artist.write(bytes(json.dumps(json_track_index, indent=None, cls=SetEncoder), 'UTF-8'))
# file_artist.write(bytes("\n", 'UTF-8')) # file_artist.write(bytes("\n", 'UTF-8'))
file_artist.write(bytes(json.dumps(artist, indent=None, cls=SetEncoder), 'UTF-8')) file_artist.write(
file_artist.write(bytes("\n", 'UTF-8')) bytes(json.dumps(artist, indent=None, cls=SetEncoder), "UTF-8")
)
file_artist.write(bytes("\n", "UTF-8"))
file_artist.close() file_artist.close()
@staticmethod @staticmethod
def write_albums(albums, output_file): def write_albums(albums, output_file):
""" """
Write albums data to another JSON file Write albums data to another JSON file
""" """
file_albums = io.open(output_file, 'wb') file_albums = io.open(output_file, "wb")
for _, album in albums.items(): for _, album in albums.items():
persistent_id = album['Persistent ID'] persistent_id = album["Persistent ID"]
album['Avg Bit Rate'] = round(album['Avg Bit Rate']) album["Avg Bit Rate"] = round(album["Avg Bit Rate"])
json_track_index = { json_track_index = {
"index": {"_index": ITunesParser.ALBUM_INDEX, "_id": persistent_id} "index": {"_index": ITunesParser.ALBUM_INDEX, "_id": persistent_id}
@@ -286,16 +290,18 @@ class WriteElsJson:
# file_albums.write(bytes(json.dumps(json_track_index, indent=None, cls=SetEncoder), 'UTF-8')) # file_albums.write(bytes(json.dumps(json_track_index, indent=None, cls=SetEncoder), 'UTF-8'))
# file_albums.write(bytes("\n", 'UTF-8')) # file_albums.write(bytes("\n", 'UTF-8'))
file_albums.write(bytes(json.dumps(album, indent=None, cls=SetEncoder), 'UTF-8')) file_albums.write(
file_albums.write(bytes("\n", 'UTF-8')) bytes(json.dumps(album, indent=None, cls=SetEncoder), "UTF-8")
)
file_albums.write(bytes("\n", "UTF-8"))
file_albums.close() file_albums.close()
@staticmethod @staticmethod
def write_songs(songs, output_file): def write_songs(songs, output_file):
""" """
Write songs to a JSON Write songs to a JSON
""" """
file = io.open(output_file, 'wb') file = io.open(output_file, "wb")
for persistent_id, song in songs.items(): for persistent_id, song in songs.items():
json_track_index = { json_track_index = {
"index": {"_index": ITunesParser.SONG_INDEX, "_id": persistent_id} "index": {"_index": ITunesParser.SONG_INDEX, "_id": persistent_id}
@@ -303,43 +309,57 @@ class WriteElsJson:
# file.write(bytes(json.dumps(json_track_index, indent=None, cls=SetEncoder), 'UTF-8')) # file.write(bytes(json.dumps(json_track_index, indent=None, cls=SetEncoder), 'UTF-8'))
# file.write(bytes("\n", 'UTF-8')) # file.write(bytes("\n", 'UTF-8'))
file.write(bytes(json.dumps(song, indent=None, cls=SetEncoder), 'UTF-8')) file.write(bytes(json.dumps(song, indent=None, cls=SetEncoder), "UTF-8"))
file.write(bytes("\n", 'UTF-8')) file.write(bytes("\n", "UTF-8"))
file.close() file.close()
#### main block #### #### main block ####
# Default input & output files # Default input & output files
DEFAULT_LIBRARY_FILE_NAME = 'iTunesLibrary.xml' DEFAULT_LIBRARY_FILE_NAME = "iTunesLibrary.xml"
DEFAULT_OUTPUT_FILE_NAME = '/es-music-data.json' DEFAULT_OUTPUT_FILE_NAME = "/es-music-data.json"
DEFAULT_LIBRARY_FILE = os.path.expanduser(DEFAULT_LIBRARY_FILE_NAME) DEFAULT_LIBRARY_FILE = os.path.expanduser(DEFAULT_LIBRARY_FILE_NAME)
DEFAULT_OUTPUT_FILE = os.path.dirname(os.path.realpath(__file__)) + DEFAULT_OUTPUT_FILE_NAME DEFAULT_OUTPUT_FILE = (
os.path.dirname(os.path.realpath(__file__)) + DEFAULT_OUTPUT_FILE_NAME
)
# Get options # Get options
parser = argparse.ArgumentParser(description=""" parser = argparse.ArgumentParser(
description="""
Parse an iTunes XML library file to produce JSON file for ELS bulk operation. Parse an iTunes XML library file to produce JSON file for ELS bulk operation.
""") """
parser.add_argument('-f', '--file', default=DEFAULT_LIBRARY_FILE, )
help='iTunes Library XML file path (default: ./' + DEFAULT_LIBRARY_FILE_NAME + ')') parser.add_argument(
parser.add_argument('-o', '--output', default=DEFAULT_OUTPUT_FILE, "-f",
help='Output to file (default: .' + DEFAULT_OUTPUT_FILE_NAME + ')') "--file",
parser.add_argument('-c', '--console', action='store_true', default=DEFAULT_LIBRARY_FILE,
help='Output to console instead of file') help="iTunes Library XML file path (default: ./" + DEFAULT_LIBRARY_FILE_NAME + ")",
)
parser.add_argument(
"-o",
"--output",
default=DEFAULT_OUTPUT_FILE,
help="Output to file (default: ." + DEFAULT_OUTPUT_FILE_NAME + ")",
)
parser.add_argument(
"-c", "--console", action="store_true", help="Output to console instead of file"
)
# parser.add_argument('-v', '--verbose', action='store_true', # parser.add_argument('-v', '--verbose', action='store_true',
# help='Verbose output') # help='Verbose output')
if __name__ == '__main__': if __name__ == "__main__":
args = parser.parse_args() args = parser.parse_args()
print("Parsing file '{}'...".format(args.file)) print(f"Parsing file '{args.file}'...")
itunes_parser = ITunesParser().parse(args.file) itunes_parser = ITunesParser().parse(args.file)
print("Writing JSON files...") print("Writing JSON files...")
WriteElsJson.write_songs(itunes_parser['songs'], "es-songs.jsonl") WriteElsJson.write_songs(itunes_parser["songs"], "es-songs.jsonl")
WriteElsJson.write_artists(itunes_parser['artists'], "es-artists.jsonl") WriteElsJson.write_artists(itunes_parser["artists"], "es-artists.jsonl")
WriteElsJson.write_albums(itunes_parser['albums'], "es-albums.jsonl") WriteElsJson.write_albums(itunes_parser["albums"], "es-albums.jsonl")
print('Done!') print("Done!")
# if args.console: # if args.console:
# print(output) # print(output)