(send_data) Refactor script
Change structure to have a clean Main function Simplify arguments and operation
This commit is contained in:
316
send_data.py
316
send_data.py
@@ -1,4 +1,4 @@
|
||||
#!/usr/bin/env python
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Send JSON files to ELS
|
||||
@@ -6,9 +6,172 @@
|
||||
|
||||
import sys
|
||||
import argparse
|
||||
import requests
|
||||
import json
|
||||
import time
|
||||
import requests
|
||||
|
||||
# Default file names
|
||||
DEFAULT_SONG_FILE = 'es-songs.json'
|
||||
DEFAULT_ALBUM_FILE = 'es-albums.json'
|
||||
DEFAULT_ARTIST_FILE = 'es-artists.json'
|
||||
DEFAULT_MAPPING_FILE = 'mapping.json'
|
||||
|
||||
# Global values / set as default values
|
||||
ELASTICSEARCH_URL = 'http://localhost:9200/'
|
||||
INDEX_NAME = "itunessongs"
|
||||
# Why global variable ?
|
||||
# Because if I want to use a class to do post/put operation, I will use class or instance variables
|
||||
|
||||
def main():
|
||||
"""
|
||||
Main function
|
||||
"""
|
||||
global ELASTICSEARCH_URL, INDEX_NAME
|
||||
|
||||
args = create_parser().parse_args()
|
||||
|
||||
if not args.song and args.ALL:
|
||||
print(__file__ + ': error: argument -A/--ALL: not allowed with argument -s/--song')
|
||||
sys.exit(-1)
|
||||
|
||||
# Overloaded setting value
|
||||
ELASTICSEARCH_URL = args.elasticsearch_url
|
||||
INDEX_NAME = args.index_name # Used for deletion / creation
|
||||
# TODO Improvement: check first line of file(s) to get index
|
||||
|
||||
if not args.quiet:
|
||||
print("*** Settings values ***")
|
||||
print("Elasticsearch URL:\t" + ELASTICSEARCH_URL)
|
||||
print("Index name:\t\t" + INDEX_NAME)
|
||||
print()
|
||||
else:
|
||||
print("Processing...")
|
||||
|
||||
if args.DELETE:
|
||||
delete_index(INDEX_NAME, args.quiet)
|
||||
put_mapping(INDEX_NAME, args.mapping_file, args.quiet)
|
||||
print()
|
||||
#TODO Detect if index doesn't exist
|
||||
|
||||
#TODO Use log instead print
|
||||
|
||||
# Send song data
|
||||
if args.song or args.ALL:
|
||||
if not args.song_file:
|
||||
try:
|
||||
song_file = open(DEFAULT_SONG_FILE, 'r')
|
||||
except FileNotFoundError as error:
|
||||
print(error)
|
||||
print("Default file not found.\nUse -sf argument, or -h for more help")
|
||||
sys.exit(2)
|
||||
else:
|
||||
song_file = args.song_file
|
||||
if not args.quiet:
|
||||
print("Song file: '{}'".format(song_file.name))
|
||||
|
||||
send_data(song_file, args.quiet)
|
||||
check_all_data_is_saved(song_file, args.quiet)
|
||||
# ? Improvment: allow to stop script if all data not sent?
|
||||
if not args.quiet:
|
||||
print()
|
||||
else:
|
||||
print('Songs sent')
|
||||
elif not args.quiet:
|
||||
print("Sending songs is disabled.")
|
||||
print()
|
||||
|
||||
# Send artist data
|
||||
if args.artist_file or args.ALL:
|
||||
artist_file = args.artist_file
|
||||
if not artist_file:
|
||||
if not args.quiet:
|
||||
print('No artist file specified, take default file...')
|
||||
artist_file = open(DEFAULT_ARTIST_FILE, 'r')
|
||||
|
||||
if not args.quiet:
|
||||
print("Artist file: '{}'".format(artist_file.name))
|
||||
|
||||
send_data(artist_file, args.quiet)
|
||||
check_all_data_is_saved(artist_file, args.quiet)
|
||||
if not args.quiet:
|
||||
print()
|
||||
else:
|
||||
print('Artist sent')
|
||||
|
||||
if args.album_file or args.ALL:
|
||||
album_file = args.album_file
|
||||
if not album_file:
|
||||
if not args.quiet:
|
||||
print('No album file specified, take default file...')
|
||||
album_file = open(DEFAULT_ALBUM_FILE, 'r')
|
||||
|
||||
if not args.quiet:
|
||||
print("Take file '{}' to send song data".format(album_file.name))
|
||||
send_data(album_file, args.quiet)
|
||||
check_all_data_is_saved(album_file, args.quiet)
|
||||
if not args.quiet:
|
||||
print()
|
||||
else:
|
||||
print('Album sent')
|
||||
|
||||
print("I'm done!")
|
||||
|
||||
def create_parser():
|
||||
"""
|
||||
Create parser with all options, default values, etc.
|
||||
Return the parser ready to parse args
|
||||
"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description='''
|
||||
Send JSON files formated for bulk Elasticsearch operation to an Elasticsearch.
|
||||
|
||||
By default: send song data enable, send album & artist data disabled.
|
||||
Check that all the data has been sent.
|
||||
|
||||
Detect if index doesn't exist and create it with a mapping file (see -map and -idx argument).
|
||||
Remeber : it's cumulative! If you want to remove songs/artits/albums,
|
||||
you have to delete and re-create the index (use -D option).
|
||||
'''
|
||||
)
|
||||
# Bulk
|
||||
parser.add_argument('-q', '--quiet', action='store_true',
|
||||
help="Disable main output")
|
||||
# Choose what to enable for send and files
|
||||
sending_group = parser.add_argument_group("Sending options")
|
||||
song_group = sending_group.add_mutually_exclusive_group()
|
||||
song_group.add_argument('-sf', '--song-file', type=argparse.FileType('r'),
|
||||
help='Song file data to send (default: \'{}\').'.format(DEFAULT_SONG_FILE))
|
||||
sending_group.add_argument('-al', '--album-file', nargs='?', type=argparse.FileType('r'), const=DEFAULT_ALBUM_FILE,
|
||||
help='Enable sending album data. Optionally, precise the album data file (default: \'{}\')'
|
||||
.format(DEFAULT_ALBUM_FILE))
|
||||
sending_group.add_argument('-ar', '--artist-file', nargs='?', type=argparse.FileType('r'), const=DEFAULT_ARTIST_FILE,
|
||||
help='Enable sending artist data. Optionally, precise the artist data file (default: \'{}\')'
|
||||
.format(DEFAULT_ARTIST_FILE))
|
||||
song_group.add_argument('-s', '--song', action='store_false',
|
||||
help='Disable sending song data')
|
||||
|
||||
# Mode
|
||||
mode_group = parser.add_argument_group('Mode')
|
||||
mode_group.add_argument('-A', '--ALL', action='store_true',
|
||||
help='Send all possible data: song, artist and album')
|
||||
mode_group.add_argument('-D', '--DELETE', action='store_true',
|
||||
help='''Delete old index and create a new.
|
||||
See -idx argument to set index name.
|
||||
See -map arguement to set mapping file.''')
|
||||
|
||||
# Global Settings
|
||||
g_settings_group = parser.add_argument_group('Global Settings')
|
||||
g_settings_group.add_argument('-els', '--elasticsearch-url', default=ELASTICSEARCH_URL, nargs='?',
|
||||
help="Elasticsearch URL (default: \'{}\')".format(ELASTICSEARCH_URL))
|
||||
g_settings_group.add_argument('-idx', '--index-name', default=INDEX_NAME, nargs='?',
|
||||
help="""Index name in Elasticsearch (default: \'{}\').
|
||||
Used when creating the index: if it does not exist or after deletion.
|
||||
When sending data, the index name is specified in JSON files.""".format(INDEX_NAME))
|
||||
g_settings_group.add_argument('-map', '--mapping-file', type=argparse.FileType('r'), default=DEFAULT_MAPPING_FILE, nargs='?',
|
||||
help='If deleting index or if index does not exist, mapping file to use (default: \'{}\')'
|
||||
.format(DEFAULT_MAPPING_FILE))
|
||||
|
||||
return parser
|
||||
|
||||
def send_data(file, quiet=False):
|
||||
"""
|
||||
@@ -27,10 +190,7 @@ def send_data(file, quiet=False):
|
||||
print(res.text)
|
||||
else:
|
||||
if not quiet:
|
||||
print("File '{} sended to Elasticsearch!".format(file.name))
|
||||
|
||||
check_all_data_is_saved(file)
|
||||
|
||||
print("File '{}' sent to Elasticsearch!".format(file.name))
|
||||
|
||||
def delete_index(index_name, quiet=False):
|
||||
"""
|
||||
@@ -52,6 +212,7 @@ def delete_index(index_name, quiet=False):
|
||||
def put_mapping(index_name, mapping_file, quiet=False):
|
||||
"""
|
||||
Send a mapping file for an index to ELS.
|
||||
Also, set replica to 0
|
||||
"""
|
||||
if not quiet:
|
||||
print("Put '{}' mapping file...".format(mapping_file.name))
|
||||
@@ -63,37 +224,58 @@ def put_mapping(index_name, mapping_file, quiet=False):
|
||||
print(res.text)
|
||||
else:
|
||||
if not quiet:
|
||||
print("File '{} sended to Elasticsearch!".format(mapping_file.name))
|
||||
print("File '{}' sent to Elasticsearch!".format(mapping_file.name))
|
||||
|
||||
put_setting(index_name, quiet)
|
||||
put_setting(index_name, 0, quiet)
|
||||
|
||||
def check_all_data_is_saved(file):
|
||||
time.sleep(2)
|
||||
with open(file.name, 'r') as file:
|
||||
def check_all_data_is_saved(data_file, quiet=False):
|
||||
"""
|
||||
Check if found same number of documents in ELS as number of line in file.
|
||||
Detect type of data to be searched in ELS.
|
||||
Return True if the number is the same.
|
||||
"""
|
||||
if not quiet:
|
||||
print('Wait a second and check that all the data has been sent')
|
||||
time.sleep(1)
|
||||
|
||||
with open(data_file.name, 'r') as file:
|
||||
lines = file.readlines()
|
||||
file_nb_line = len(lines) / 2
|
||||
extract = json.loads(lines[1])
|
||||
type = extract['type']
|
||||
data_type = extract['type']
|
||||
|
||||
payload = {"track_total_hits": "true", "query": {"constant_score": {"filter": {"term": {"type": type}}}}}
|
||||
if not quiet:
|
||||
print("\tFound: {} lines in '{}' file".format(file_nb_line, data_file.name))
|
||||
|
||||
payload = {"track_total_hits": "true", "query": {"constant_score": {"filter": {"term": {"type": data_type}}}}}
|
||||
|
||||
res = requests.get(url=ELASTICSEARCH_URL + INDEX_NAME + '/_search?size=0',
|
||||
data=json.dumps(payload),
|
||||
headers={'Content-Type': 'application/x-ndjson'})
|
||||
if res.status_code != 200:
|
||||
print("An error occured")
|
||||
print(res.text)
|
||||
|
||||
element_in_els = res.json()['hits']['total']['value']
|
||||
els_nb_doc = res.json()['hits']['total']['value']
|
||||
|
||||
print(element_in_els)
|
||||
print(str(int(file_nb_line)))
|
||||
if not quiet:
|
||||
print("\tFound: {} documents with '{}' type in ELS".format(els_nb_doc, data_type))
|
||||
|
||||
def put_setting(index_name, quiet=False):
|
||||
if file_nb_line != els_nb_doc:
|
||||
print('Look out! Not all the data has been found in ELS')
|
||||
elif not quiet:
|
||||
print('All data is in ELS, it\'s ok')
|
||||
|
||||
return file_nb_line == els_nb_doc
|
||||
|
||||
def put_setting(index_name, nb_replicas=0, quiet=False):
|
||||
"""
|
||||
Update setting of index to set number of replica to 0
|
||||
"""
|
||||
if not quiet:
|
||||
print("Update setting of index '{}' - set replica to 0...".format(index_name))
|
||||
|
||||
query = {"index" : {"number_of_replicas" : 0}}
|
||||
query = {"index" : {"number_of_replicas" : nb_replicas}}
|
||||
res = requests.put(url=ELASTICSEARCH_URL + index_name + "/_settings",
|
||||
data=json.dumps(query),
|
||||
headers={'Content-Type': 'application/json'})
|
||||
@@ -104,102 +286,6 @@ def put_setting(index_name, quiet=False):
|
||||
if not quiet:
|
||||
print('Setting of index updated')
|
||||
|
||||
#### main block ####
|
||||
|
||||
# Settings var (can be overloaded)
|
||||
ELASTICSEARCH_URL = 'http://localhost:9200/'
|
||||
INDEX_NAME = "itunessongs"
|
||||
|
||||
# Default file names
|
||||
DEFAULT_SONG_FILE = 'es-songs.json'
|
||||
DEFAULT_ALBUM_FILE = 'es-albums.json'
|
||||
DEFAULT_ARTIST_FILE = 'es-artists.json'
|
||||
DEFAULT_MAPPING_FILE = 'mapping.json'
|
||||
|
||||
# Get options
|
||||
parser = argparse.ArgumentParser(
|
||||
description='''
|
||||
Send JSON files formated for bulk Elasticsearch operation to an Elasticsearch.
|
||||
|
||||
By default: send song data enable, send album & artist data disabled.
|
||||
'''
|
||||
)
|
||||
# Bulk
|
||||
parser.add_argument('-q', '--quiet', action='store_true',
|
||||
help="Disable main output")
|
||||
# Choose what to enable for send and files
|
||||
sending_group = parser.add_argument_group("Sending options")
|
||||
song_group = sending_group.add_mutually_exclusive_group()
|
||||
song_group.add_argument('-s', '--song', action='store_false',
|
||||
help='Disable send song data')
|
||||
song_group.add_argument('-sf', '--song-file', type=argparse.FileType('r'),
|
||||
help='Song file data to send (default: \'{}\').'.format(DEFAULT_SONG_FILE))
|
||||
sending_group.add_argument('-al', '--album-file', nargs='?', type=argparse.FileType('r'), const=DEFAULT_ALBUM_FILE,
|
||||
help='Enable send album data. Optionally, precise the album file (default: \'{}\')'.format(DEFAULT_ALBUM_FILE))
|
||||
sending_group.add_argument('-ar', '--artist-file', nargs='?', type=argparse.FileType('r'), const=DEFAULT_ARTIST_FILE,
|
||||
help='Enable send artist data. Optionally, precise the artist file (default: \'{}\')'.format(DEFAULT_ARTIST_FILE))
|
||||
sending_group.add_argument('-m', '--mapping-file', type=argparse.FileType('r'), default=DEFAULT_MAPPING_FILE,
|
||||
help='If deleting index, mapping file to send (default: \'{}\')'.format(DEFAULT_MAPPING_FILE))
|
||||
# Mode
|
||||
mode_group = parser.add_argument_group('Mode')
|
||||
mode_group.add_argument('-A', '--ALL', action='store_true',
|
||||
help='Send all possible data: song, artist and album')
|
||||
mode_group.add_argument('-D', '--DELETE', action='store_true',
|
||||
help='Delete old index (precise name with -idx argument). This will send a mapping to ELS.')
|
||||
# Settings
|
||||
g_settings_group = parser.add_argument_group('Global Settings')
|
||||
g_settings_group.add_argument('-els', '--elasticsearch-url', default=ELASTICSEARCH_URL, nargs='?',
|
||||
help="Elasticsearch URL to send data (default: \'{}\')".format(ELASTICSEARCH_URL))
|
||||
g_settings_group.add_argument('-idx', '--index-name', default=INDEX_NAME,
|
||||
help="Index name in Elasticsearch ONLY FOR DELETING! (default: \'{}\'). ".format(INDEX_NAME) +
|
||||
"When sending data, index name is specified in JSON files.")
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = parser.parse_args()
|
||||
|
||||
# Overloaded setting value
|
||||
INDEX_NAME = args.index_name
|
||||
# TODO Critical: index in in the file!
|
||||
ELASTICSEARCH_URL = args.elasticsearch_url
|
||||
|
||||
if not args.quiet:
|
||||
print("*** Settings values ***")
|
||||
print("Elasticsearch URL:\t" + ELASTICSEARCH_URL)
|
||||
print("Index name:\t\t" + INDEX_NAME)
|
||||
print("")
|
||||
else:
|
||||
print("Processing...")
|
||||
|
||||
if args.DELETE:
|
||||
delete_index(INDEX_NAME, args.quiet)
|
||||
put_mapping(INDEX_NAME, args.mapping_file, args.quiet)
|
||||
|
||||
if args.song or args.ALL:
|
||||
# Retrieve default song file if not precised
|
||||
if not args.song_file:
|
||||
try:
|
||||
song_file = open(DEFAULT_SONG_FILE, 'r')
|
||||
except FileNotFoundError: # Theoretically, can occur only when default file not found
|
||||
print("Error: can't open default music file: [Errno 2] No such file or directory: '{}'.".format(DEFAULT_SONG_FILE))
|
||||
print("Use -sf argument, or -h for more help")
|
||||
sys.exit(2)
|
||||
else:
|
||||
song_file = args.song_file
|
||||
if not args.quiet:
|
||||
print("Take file '{}' to send song data".format(song_file.name))
|
||||
send_data(song_file, args.quiet)
|
||||
if args.artist_file or args.ALL:
|
||||
artist_file = args.artist_file
|
||||
if not args.quiet:
|
||||
print("Take file '{}' to send song data".format(artist_file.name))
|
||||
send_data(artist_file, args.quiet)
|
||||
|
||||
if args.album_file or args.ALL:
|
||||
album_file = args.album_file
|
||||
if not args.quiet:
|
||||
print("Take file '{}' to send song data".format(album_file.name))
|
||||
send_data(album_file, args.quiet)
|
||||
|
||||
check_all_data_is_saved(artist_file)
|
||||
check_all_data_is_saved(song_file)
|
||||
check_all_data_is_saved(album_file)
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user