Files
iTunes/send_data.py

372 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Send JSON files to ELS
"""
import sys
import argparse
import json
import time
import requests
from suggester import process_file
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
# Default file names
SONG_FILE = 'es-songs.json'
ALBUM_FILE = 'es-albums.json'
ARTIST_FILE = 'es-artists.json'
MAPPING_SONGS_FILE = 'mapping.songs.json'
MAPPING_ARTISTS_FILE = 'mapping.artists.json'
MAPPING_ALBUMS_FILE = 'mapping.albums.json'
MAPPING_SUGGEST_FILE = 'mapping.suggest.json'
SONG_INDEX = 'itunes-songs'
ALBUM_INDEX = 'itunes-albums'
ARTIST_INDEX = 'itunes-artists'
SUGGEST_INDEX = 'itunes-suggest'
# TODO Put variables in a config files or in a python library
# Global values / set as default values
ELASTICSEARCH_URL = 'http://localhost:9200/'
# Why global variable ?
# Because if I want to use a class to do post/put operation, I will use class or instance variables
def main():
"""
Main function
"""
global ELASTICSEARCH_URL
args = create_args_parser().parse_args()
if args.ALL and args.no_song:
print(__file__ + ': error: argument -A/--ALL: not allowed with argument --no-song')
sys.exit(-1)
# Overloaded setting value
ELASTICSEARCH_URL = args.elasticsearch_url
# TODO Improvement: check first line of file(s) to get index
if not args.quiet:
print("*** Settings values ***")
print("Elasticsearch URL:\t" + ELASTICSEARCH_URL)
print()
else:
print("Processing...")
#TODO Detect if index doesn't exist
#TODO Use log instead print
check_is_ok = []
# Send song data
if not args.no_song:
if args.DELETE:
mapping_song = load_file(args.mapping_song, MAPPING_SONGS_FILE)
if not args.quiet:
print("Mapping of song index file: '{}'".format(mapping_song.name))
delete_index(SONG_INDEX, args.quiet)
put_mapping(SONG_INDEX, mapping_song, args.quiet)
song_file = load_file(args.song_file, SONG_FILE)
if not args.quiet:
print("Song file: '{}'".format(song_file.name))
send_data(song_file, args.quiet)
check = check_all_data_is_saved(song_file, SONG_INDEX, args.quiet)
check_is_ok.append(check)
# ? Improvment: allow to stop script if all data not sent?
if not args.quiet:
print()
else:
print('Songs sent')
elif not args.quiet:
print("Sending songs is disabled.")
print()
# Send artist data
if args.artist_file or args.ALL:
if args.DELETE:
mapping_artist = load_file(args.mapping_artist, MAPPING_ARTISTS_FILE)
if not args.quiet:
print("Mapping of artist index file: '{}'".format(mapping_artist.name))
delete_index(ARTIST_INDEX, args.quiet)
put_mapping(ARTIST_INDEX, mapping_artist, args.quiet)
artist_file = args.artist_file
if not artist_file:
if not args.quiet:
print('No artist file specified, take default file...')
artist_file = open(ARTIST_FILE, 'r')
if not args.quiet:
print("Artist file: '{}'".format(artist_file.name))
send_data(artist_file, args.quiet)
check = check_all_data_is_saved(artist_file, ARTIST_INDEX, args.quiet)
check_is_ok.append(check)
if not args.quiet:
print()
else:
print('Artist sent')
if args.album_file or args.ALL:
if args.DELETE:
mapping_album = load_file(args.mapping_album, MAPPING_ALBUMS_FILE)
if not args.quiet:
print("Mapping of artist index file: '{}'".format(mapping_album.name))
delete_index(ALBUM_INDEX, args.quiet)
put_mapping(ALBUM_INDEX, mapping_album, args.quiet)
album_file = args.album_file
if not album_file:
if not args.quiet:
print('No album file specified, take default file...')
album_file = open(ALBUM_FILE, 'r')
if not args.quiet:
print("Take file '{}' to send song data".format(album_file.name))
send_data(album_file, args.quiet)
check = check_all_data_is_saved(album_file, ALBUM_INDEX, args.quiet)
check_is_ok.append(check)
if not args.quiet:
print()
else:
print('Album sent')
if not args.no_suggest:
print("Process suggestion:")
if args.DELETE:
delete_index(SUGGEST_INDEX, args.quiet)
if not args.ALL and not args.album_file and not args.artist_file:
print('Only song file processed. No suggestion to process.')
else:
if args.DELETE:
mapping_suggest = load_file(args.mapping_suggest, MAPPING_SUGGEST_FILE)
if not args.quiet:
print("Mapping of suggest index file: '{}'".format(mapping_suggest.name))
put_mapping(SUGGEST_INDEX, mapping_suggest, args.quiet)
suggs_docs = 0
if args.album_file or args.ALL:
suggs_docs += process_file(ALBUM_FILE, 'Album')
print('Created suggestion documents: ' + str(suggs_docs))
if args.artist_file or args.ALL:
suggs_docs += process_file(ARTIST_FILE, 'Artist', 'Album Artist')
print('Created suggestion documents: ' + str(suggs_docs))
print("I'm done!")
if check_is_ok.count(False) > 0:
print('Some problems occurs')
sys.exit(check_is_ok.count(False))
def load_file(args_file, default_file):
"""
If args file in None, open default file
"""
if not args_file:
try:
final_file = open(default_file, 'r')
except FileNotFoundError as error:
print(error)
print("Default file not found.\nUse -sf argument, or -h for more help")
sys.exit(2)
else:
final_file = args_file
return final_file
def create_args_parser():
"""
Create parser with all options, default values, etc.
Return the parser ready to parse args
"""
# TODO rewrit description with multi-index phylosophie
parser = argparse.ArgumentParser(
description='''
Send JSON files formated for bulk Elasticsearch operation to an Elasticsearch.
By default: send only song data. See option to send album/artist/suggest data.
Create index if -D option activated with a mapping file (see -map).
It's cumulative! If you want to remove songs/artits/albums, you have to delete and re-create the index (use -D option).''',
formatter_class=argparse.RawTextHelpFormatter
)
# Bulk
parser.add_argument('-q', '--quiet', action='store_true',
help="Disable main output")
# Choose what to enable for send and files
sending_group = parser.add_argument_group("Sending options")
song_group = sending_group.add_mutually_exclusive_group()
song_group.add_argument('-sf', '--song-file', type=argparse.FileType('r'),
help='Song file data to send (default: \'{}\').'.format(SONG_FILE))
sending_group.add_argument('-al', '--album-file', nargs='?', type=argparse.FileType('r'), const=ALBUM_FILE,
help='Enable sending album data. Optionally, precise the album data file (default: \'{}\')'
.format(ALBUM_FILE))
sending_group.add_argument('-ar', '--artist-file', nargs='?', type=argparse.FileType('r'), const=ARTIST_FILE,
help='Enable sending artist data. Optionally, precise the artist data file (default: \'{}\')'
.format(ARTIST_FILE))
# Mode
mode_group = parser.add_argument_group('Mode')
mode_group.add_argument('-A', '--ALL', action='store_true',
help='Send all possible data: song, artist, album and suggest. Use default file if not specified')
mode_group.add_argument('-D', '--DELETE', action='store_true',
help='Delete index and create new. See -map arguement to set mapping file')
mode_group.add_argument('--no-song', action='store_true',
help='''Disable sending song data.
Not allowed with -A option.''')
mode_group.add_argument('--no-suggest', action='store_true',
help='Disable sending suggest data. Allowed with -A option')
# Mapping
mapping_group = parser.add_argument_group('Mapping files')
# CAUTION default values cannot be used because they necessarily activate the option
# QUESTION Use a for with a list of default mapping file?
mapping_group.add_argument('-ms', '--mapping-song', type=argparse.FileType('r'), const=MAPPING_SONGS_FILE, nargs='?',
help='Mapping file for songs (default: \'{}\')'.format(MAPPING_SONGS_FILE))
mapping_group.add_argument('-mr', '--mapping-artist', type=argparse.FileType('r'), const=ARTIST_FILE, nargs='?',
help='Mapping file for artists (default: \'{}\')'.format(MAPPING_ARTISTS_FILE))
mapping_group.add_argument('-ml', '--mapping-album', type=argparse.FileType('r'), const=MAPPING_ALBUMS_FILE, nargs='?',
help='Mapping file for albums (default: \'{}\')'.format(MAPPING_ALBUMS_FILE))
mapping_group.add_argument('-mg', '--mapping-suggest', type=argparse.FileType('r'), const=MAPPING_SUGGEST_FILE, nargs='?',
help='Mapping file for suggest (default: \'{}\')'.format(MAPPING_SUGGEST_FILE))
# Global Settings
g_settings_group = parser.add_argument_group('Global Settings')
g_settings_group.add_argument('-els', '--elasticsearch-url', default=ELASTICSEARCH_URL, nargs='?',
help="Elasticsearch URL.")
return parser
def send_data(file, quiet=False):
"""
Send a data bulk file to ELS.
'file' should be (readable) file object.
"""
if not quiet:
print("Sending '{}' data file...".format(file.name))
res = requests.post(url=ELASTICSEARCH_URL + '_bulk',
data=file,
headers={'Content-Type': 'application/x-ndjson'})
if res.status_code != 200:
print("An error occured")
print(res.text)
else:
if not quiet:
print(bcolors.OKGREEN + "File '{}' sent to Elasticsearch!".format(file.name) + bcolors.ENDC)
def delete_index(index_name, quiet=False):
"""
Delete an index in ELS
"""
if not quiet:
print('Deleting index \'{}\'...'.format(index_name))
res = requests.delete(url=ELASTICSEARCH_URL + index_name)
if res.status_code == 200:
if not quiet:
print(bcolors.OKGREEN + "Index '{}' deleted!".format(index_name) + bcolors.ENDC)
else:
print(bcolors.FAIL + "An error occured" + bcolors.ENDC)
if res.json()['error']['type'] == 'index_not_found_exception':
print("Index '{}' doesn't exist and can't be deleted".format(index_name))
else:
print(res.text)
def put_mapping(index_name, mapping_file, quiet=False):
"""
Send a mapping file for an index to ELS.
Also, set replica to 0
"""
if not quiet:
print("Put '{}' mapping file...".format(mapping_file.name))
res = requests.put(url=ELASTICSEARCH_URL + index_name,
data=mapping_file,
headers={'Content-Type': 'application/json'})
if res.status_code != 200:
print(bcolors.FAIL + "An error occured")
print(res.text + bcolors.ENDC)
else:
if not quiet:
print(bcolors.OKGREEN + "Mapping for '{}' sent".format(index_name) + bcolors.ENDC)
put_setting(index_name, 0, quiet)
def check_all_data_is_saved(data_file, index_name, quiet=False):
"""
Check if found same number of documents in ELS as number of line in file.
Detect type of data to be searched in ELS.
Return True if the number is the same.
"""
if not quiet:
print('Wait a second and check that all the data has been sent')
time.sleep(1)
data_file.seek(0)
lines = data_file.readlines()
file_nb_line = int(len(lines) / 2)
if not quiet:
print("\tFound: {} lines in '{}' file".format(file_nb_line, data_file.name))
payload = {"track_total_hits": "true"}
res = requests.get(url=ELASTICSEARCH_URL + index_name + '/_search?size=0',
data=json.dumps(payload),
headers={'Content-Type': 'application/x-ndjson'})
if res.status_code != 200:
print(bcolors.FAIL + "An error occured")
print(res.text + bcolors.ENDC)
els_nb_doc = res.json()['hits']['total']['value']
if not quiet:
print("\tFound: {} documents in index '{}' in ELS".format(els_nb_doc, index_name))
if file_nb_line != els_nb_doc:
print(bcolors.WARNING + 'Look out! Not all the data has been found in ELS' + bcolors.ENDC)
elif not quiet:
print(bcolors.OKGREEN + 'All data is in ELS, it\'s ok' + bcolors.ENDC)
return file_nb_line == els_nb_doc
def put_setting(index_name, nb_replicas=0, quiet=False):
"""
Update setting of index to set number of replica to 0
"""
if not quiet:
print("Update setting of index '{}' - set replica to 0...".format(index_name))
query = {"index" : {"number_of_replicas" : nb_replicas}}
res = requests.put(url=ELASTICSEARCH_URL + index_name + "/_settings",
data=json.dumps(query),
headers={'Content-Type': 'application/json'})
if res.status_code != 200:
print("An error occured")
print(res.text)
else:
if not quiet:
print('Setting of index updated')
if __name__ == '__main__':
main()