From 1430566c6cd630e8639cccc4813d7bcfc9cae8b8 Mon Sep 17 00:00:00 2001 From: "Maxence G. de Montauzan" Date: Mon, 6 Apr 2020 03:02:15 +0200 Subject: [PATCH] (send_data) Refactor script Change structure to have a clean Main function Simplify arguments and operation --- send_data.py | 316 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 201 insertions(+), 115 deletions(-) diff --git a/send_data.py b/send_data.py index 106e89d..ab1e3fe 100644 --- a/send_data.py +++ b/send_data.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 """ Send JSON files to ELS @@ -6,9 +6,172 @@ import sys import argparse -import requests import json import time +import requests + +# Default file names +DEFAULT_SONG_FILE = 'es-songs.json' +DEFAULT_ALBUM_FILE = 'es-albums.json' +DEFAULT_ARTIST_FILE = 'es-artists.json' +DEFAULT_MAPPING_FILE = 'mapping.json' + +# Global values / set as default values +ELASTICSEARCH_URL = 'http://localhost:9200/' +INDEX_NAME = "itunessongs" +# Why global variable ? +# Because if I want to use a class to do post/put operation, I will use class or instance variables + +def main(): + """ + Main function + """ + global ELASTICSEARCH_URL, INDEX_NAME + + args = create_parser().parse_args() + + if not args.song and args.ALL: + print(__file__ + ': error: argument -A/--ALL: not allowed with argument -s/--song') + sys.exit(-1) + + # Overloaded setting value + ELASTICSEARCH_URL = args.elasticsearch_url + INDEX_NAME = args.index_name # Used for deletion / creation + # TODO Improvement: check first line of file(s) to get index + + if not args.quiet: + print("*** Settings values ***") + print("Elasticsearch URL:\t" + ELASTICSEARCH_URL) + print("Index name:\t\t" + INDEX_NAME) + print() + else: + print("Processing...") + + if args.DELETE: + delete_index(INDEX_NAME, args.quiet) + put_mapping(INDEX_NAME, args.mapping_file, args.quiet) + print() + #TODO Detect if index doesn't exist + + #TODO Use log instead print + + # Send song data + if args.song or args.ALL: + if not args.song_file: + try: + song_file = open(DEFAULT_SONG_FILE, 'r') + except FileNotFoundError as error: + print(error) + print("Default file not found.\nUse -sf argument, or -h for more help") + sys.exit(2) + else: + song_file = args.song_file + if not args.quiet: + print("Song file: '{}'".format(song_file.name)) + + send_data(song_file, args.quiet) + check_all_data_is_saved(song_file, args.quiet) + # ? Improvment: allow to stop script if all data not sent? + if not args.quiet: + print() + else: + print('Songs sent') + elif not args.quiet: + print("Sending songs is disabled.") + print() + + # Send artist data + if args.artist_file or args.ALL: + artist_file = args.artist_file + if not artist_file: + if not args.quiet: + print('No artist file specified, take default file...') + artist_file = open(DEFAULT_ARTIST_FILE, 'r') + + if not args.quiet: + print("Artist file: '{}'".format(artist_file.name)) + + send_data(artist_file, args.quiet) + check_all_data_is_saved(artist_file, args.quiet) + if not args.quiet: + print() + else: + print('Artist sent') + + if args.album_file or args.ALL: + album_file = args.album_file + if not album_file: + if not args.quiet: + print('No album file specified, take default file...') + album_file = open(DEFAULT_ALBUM_FILE, 'r') + + if not args.quiet: + print("Take file '{}' to send song data".format(album_file.name)) + send_data(album_file, args.quiet) + check_all_data_is_saved(album_file, args.quiet) + if not args.quiet: + print() + else: + print('Album sent') + + print("I'm done!") + +def create_parser(): + """ + Create parser with all options, default values, etc. + Return the parser ready to parse args + """ + parser = argparse.ArgumentParser( + description=''' + Send JSON files formated for bulk Elasticsearch operation to an Elasticsearch. + + By default: send song data enable, send album & artist data disabled. + Check that all the data has been sent. + + Detect if index doesn't exist and create it with a mapping file (see -map and -idx argument). + Remeber : it's cumulative! If you want to remove songs/artits/albums, + you have to delete and re-create the index (use -D option). + ''' + ) + # Bulk + parser.add_argument('-q', '--quiet', action='store_true', + help="Disable main output") + # Choose what to enable for send and files + sending_group = parser.add_argument_group("Sending options") + song_group = sending_group.add_mutually_exclusive_group() + song_group.add_argument('-sf', '--song-file', type=argparse.FileType('r'), + help='Song file data to send (default: \'{}\').'.format(DEFAULT_SONG_FILE)) + sending_group.add_argument('-al', '--album-file', nargs='?', type=argparse.FileType('r'), const=DEFAULT_ALBUM_FILE, + help='Enable sending album data. Optionally, precise the album data file (default: \'{}\')' + .format(DEFAULT_ALBUM_FILE)) + sending_group.add_argument('-ar', '--artist-file', nargs='?', type=argparse.FileType('r'), const=DEFAULT_ARTIST_FILE, + help='Enable sending artist data. Optionally, precise the artist data file (default: \'{}\')' + .format(DEFAULT_ARTIST_FILE)) + song_group.add_argument('-s', '--song', action='store_false', + help='Disable sending song data') + + # Mode + mode_group = parser.add_argument_group('Mode') + mode_group.add_argument('-A', '--ALL', action='store_true', + help='Send all possible data: song, artist and album') + mode_group.add_argument('-D', '--DELETE', action='store_true', + help='''Delete old index and create a new. + See -idx argument to set index name. + See -map arguement to set mapping file.''') + + # Global Settings + g_settings_group = parser.add_argument_group('Global Settings') + g_settings_group.add_argument('-els', '--elasticsearch-url', default=ELASTICSEARCH_URL, nargs='?', + help="Elasticsearch URL (default: \'{}\')".format(ELASTICSEARCH_URL)) + g_settings_group.add_argument('-idx', '--index-name', default=INDEX_NAME, nargs='?', + help="""Index name in Elasticsearch (default: \'{}\'). + Used when creating the index: if it does not exist or after deletion. + When sending data, the index name is specified in JSON files.""".format(INDEX_NAME)) + g_settings_group.add_argument('-map', '--mapping-file', type=argparse.FileType('r'), default=DEFAULT_MAPPING_FILE, nargs='?', + help='If deleting index or if index does not exist, mapping file to use (default: \'{}\')' + .format(DEFAULT_MAPPING_FILE)) + + return parser def send_data(file, quiet=False): """ @@ -27,10 +190,7 @@ def send_data(file, quiet=False): print(res.text) else: if not quiet: - print("File '{} sended to Elasticsearch!".format(file.name)) - - check_all_data_is_saved(file) - + print("File '{}' sent to Elasticsearch!".format(file.name)) def delete_index(index_name, quiet=False): """ @@ -52,6 +212,7 @@ def delete_index(index_name, quiet=False): def put_mapping(index_name, mapping_file, quiet=False): """ Send a mapping file for an index to ELS. + Also, set replica to 0 """ if not quiet: print("Put '{}' mapping file...".format(mapping_file.name)) @@ -63,37 +224,58 @@ def put_mapping(index_name, mapping_file, quiet=False): print(res.text) else: if not quiet: - print("File '{} sended to Elasticsearch!".format(mapping_file.name)) + print("File '{}' sent to Elasticsearch!".format(mapping_file.name)) - put_setting(index_name, quiet) + put_setting(index_name, 0, quiet) -def check_all_data_is_saved(file): - time.sleep(2) - with open(file.name, 'r') as file: +def check_all_data_is_saved(data_file, quiet=False): + """ + Check if found same number of documents in ELS as number of line in file. + Detect type of data to be searched in ELS. + Return True if the number is the same. + """ + if not quiet: + print('Wait a second and check that all the data has been sent') + time.sleep(1) + + with open(data_file.name, 'r') as file: lines = file.readlines() file_nb_line = len(lines) / 2 extract = json.loads(lines[1]) - type = extract['type'] + data_type = extract['type'] - payload = {"track_total_hits": "true", "query": {"constant_score": {"filter": {"term": {"type": type}}}}} + if not quiet: + print("\tFound: {} lines in '{}' file".format(file_nb_line, data_file.name)) + + payload = {"track_total_hits": "true", "query": {"constant_score": {"filter": {"term": {"type": data_type}}}}} res = requests.get(url=ELASTICSEARCH_URL + INDEX_NAME + '/_search?size=0', data=json.dumps(payload), headers={'Content-Type': 'application/x-ndjson'}) + if res.status_code != 200: + print("An error occured") + print(res.text) - element_in_els = res.json()['hits']['total']['value'] + els_nb_doc = res.json()['hits']['total']['value'] - print(element_in_els) - print(str(int(file_nb_line))) + if not quiet: + print("\tFound: {} documents with '{}' type in ELS".format(els_nb_doc, data_type)) -def put_setting(index_name, quiet=False): + if file_nb_line != els_nb_doc: + print('Look out! Not all the data has been found in ELS') + elif not quiet: + print('All data is in ELS, it\'s ok') + + return file_nb_line == els_nb_doc + +def put_setting(index_name, nb_replicas=0, quiet=False): """ Update setting of index to set number of replica to 0 """ if not quiet: print("Update setting of index '{}' - set replica to 0...".format(index_name)) - query = {"index" : {"number_of_replicas" : 0}} + query = {"index" : {"number_of_replicas" : nb_replicas}} res = requests.put(url=ELASTICSEARCH_URL + index_name + "/_settings", data=json.dumps(query), headers={'Content-Type': 'application/json'}) @@ -104,102 +286,6 @@ def put_setting(index_name, quiet=False): if not quiet: print('Setting of index updated') -#### main block #### - -# Settings var (can be overloaded) -ELASTICSEARCH_URL = 'http://localhost:9200/' -INDEX_NAME = "itunessongs" - -# Default file names -DEFAULT_SONG_FILE = 'es-songs.json' -DEFAULT_ALBUM_FILE = 'es-albums.json' -DEFAULT_ARTIST_FILE = 'es-artists.json' -DEFAULT_MAPPING_FILE = 'mapping.json' - -# Get options -parser = argparse.ArgumentParser( - description=''' - Send JSON files formated for bulk Elasticsearch operation to an Elasticsearch. - - By default: send song data enable, send album & artist data disabled. - ''' -) -# Bulk -parser.add_argument('-q', '--quiet', action='store_true', - help="Disable main output") -# Choose what to enable for send and files -sending_group = parser.add_argument_group("Sending options") -song_group = sending_group.add_mutually_exclusive_group() -song_group.add_argument('-s', '--song', action='store_false', - help='Disable send song data') -song_group.add_argument('-sf', '--song-file', type=argparse.FileType('r'), - help='Song file data to send (default: \'{}\').'.format(DEFAULT_SONG_FILE)) -sending_group.add_argument('-al', '--album-file', nargs='?', type=argparse.FileType('r'), const=DEFAULT_ALBUM_FILE, - help='Enable send album data. Optionally, precise the album file (default: \'{}\')'.format(DEFAULT_ALBUM_FILE)) -sending_group.add_argument('-ar', '--artist-file', nargs='?', type=argparse.FileType('r'), const=DEFAULT_ARTIST_FILE, - help='Enable send artist data. Optionally, precise the artist file (default: \'{}\')'.format(DEFAULT_ARTIST_FILE)) -sending_group.add_argument('-m', '--mapping-file', type=argparse.FileType('r'), default=DEFAULT_MAPPING_FILE, - help='If deleting index, mapping file to send (default: \'{}\')'.format(DEFAULT_MAPPING_FILE)) -# Mode -mode_group = parser.add_argument_group('Mode') -mode_group.add_argument('-A', '--ALL', action='store_true', - help='Send all possible data: song, artist and album') -mode_group.add_argument('-D', '--DELETE', action='store_true', - help='Delete old index (precise name with -idx argument). This will send a mapping to ELS.') -# Settings -g_settings_group = parser.add_argument_group('Global Settings') -g_settings_group.add_argument('-els', '--elasticsearch-url', default=ELASTICSEARCH_URL, nargs='?', - help="Elasticsearch URL to send data (default: \'{}\')".format(ELASTICSEARCH_URL)) -g_settings_group.add_argument('-idx', '--index-name', default=INDEX_NAME, - help="Index name in Elasticsearch ONLY FOR DELETING! (default: \'{}\'). ".format(INDEX_NAME) + - "When sending data, index name is specified in JSON files.") if __name__ == '__main__': - args = parser.parse_args() - - # Overloaded setting value - INDEX_NAME = args.index_name - # TODO Critical: index in in the file! - ELASTICSEARCH_URL = args.elasticsearch_url - - if not args.quiet: - print("*** Settings values ***") - print("Elasticsearch URL:\t" + ELASTICSEARCH_URL) - print("Index name:\t\t" + INDEX_NAME) - print("") - else: - print("Processing...") - - if args.DELETE: - delete_index(INDEX_NAME, args.quiet) - put_mapping(INDEX_NAME, args.mapping_file, args.quiet) - - if args.song or args.ALL: - # Retrieve default song file if not precised - if not args.song_file: - try: - song_file = open(DEFAULT_SONG_FILE, 'r') - except FileNotFoundError: # Theoretically, can occur only when default file not found - print("Error: can't open default music file: [Errno 2] No such file or directory: '{}'.".format(DEFAULT_SONG_FILE)) - print("Use -sf argument, or -h for more help") - sys.exit(2) - else: - song_file = args.song_file - if not args.quiet: - print("Take file '{}' to send song data".format(song_file.name)) - send_data(song_file, args.quiet) - if args.artist_file or args.ALL: - artist_file = args.artist_file - if not args.quiet: - print("Take file '{}' to send song data".format(artist_file.name)) - send_data(artist_file, args.quiet) - - if args.album_file or args.ALL: - album_file = args.album_file - if not args.quiet: - print("Take file '{}' to send song data".format(album_file.name)) - send_data(album_file, args.quiet) - - check_all_data_is_saved(artist_file) - check_all_data_is_saved(song_file) - check_all_data_is_saved(album_file) + main()