|
@@ -89,16 +89,11 @@ def feed_queue():
|
|
""" Search for pending scheduled work in
|
|
""" Search for pending scheduled work in
|
|
server and add them to a memory queue. """
|
|
server and add them to a memory queue. """
|
|
try:
|
|
try:
|
|
- response = client.get_schedule_pending()
|
|
|
|
- downloaded_counter = len(response['items'])
|
|
|
|
- for item in response['items']:
|
|
|
|
- queue.put(item)
|
|
|
|
-
|
|
|
|
- if downloaded_counter:
|
|
|
|
- log.info(('[feed_queue] {} new '
|
|
|
|
- + 'pending schedule items.') \
|
|
|
|
- .format(downloaded_counter)
|
|
|
|
- )
|
|
|
|
|
|
+ response = get_pendings()
|
|
|
|
+ # response = client.get_schedule_pending()
|
|
|
|
+ # downloaded_counter = len(response['items'])
|
|
|
|
+ # for item in response['items']:
|
|
|
|
+ queue.put(response)
|
|
|
|
|
|
if queue.qsize() > 0:
|
|
if queue.qsize() > 0:
|
|
if queue_mode == QUEUE_THREAD:
|
|
if queue_mode == QUEUE_THREAD:
|
|
@@ -188,34 +183,38 @@ def process_queue_with_threads():
|
|
def process_segment(item, audios=None, calibration=None):
|
|
def process_segment(item, audios=None, calibration=None):
|
|
""" Procesa una hora de audio """
|
|
""" Procesa una hora de audio """
|
|
|
|
|
|
- station = item['station']
|
|
|
|
|
|
+ station = item['estacion']
|
|
|
|
|
|
if not calibration:
|
|
if not calibration:
|
|
calibration = calibrations.get(station)
|
|
calibration = calibrations.get(station)
|
|
|
|
|
|
tolerance = calibration['tolerance']
|
|
tolerance = calibration['tolerance']
|
|
- date = dateutil.parser.parse(item['date'])
|
|
|
|
|
|
+ date = dateutil.parser.parse(item['fecha'])
|
|
segment_size = calibration['segmentSize']
|
|
segment_size = calibration['segmentSize']
|
|
audio_length = 0
|
|
audio_length = 0
|
|
|
|
|
|
log.info('[process_segment] (th: {}, tl: {}, ft: {}, ss: {}, ho: {}) {}' \
|
|
log.info('[process_segment] (th: {}, tl: {}, ft: {}, ss: {}, ho: {}) {}' \
|
|
.format(
|
|
.format(
|
|
- calibration['threshold'],
|
|
|
|
- calibration['tolerance'],
|
|
|
|
- calibration['fallTolerance'],
|
|
|
|
- calibration['segmentSize'],
|
|
|
|
- calibration['hourlyOffset'],
|
|
|
|
- item,
|
|
|
|
- )
|
|
|
|
|
|
+ calibration['threshold'],
|
|
|
|
+ calibration['tolerance'],
|
|
|
|
+ calibration['fallTolerance'],
|
|
|
|
+ calibration['segmentSize'],
|
|
|
|
+ calibration['hourlyOffset'],
|
|
|
|
+ item,
|
|
|
|
+ )
|
|
)
|
|
)
|
|
|
|
|
|
# 1. obtener el audio desde firebase
|
|
# 1. obtener el audio desde firebase
|
|
# y calcular su fingerprint.
|
|
# y calcular su fingerprint.
|
|
try:
|
|
try:
|
|
- filename, md5hash = cloud_download(ad_key=item['ad'])
|
|
|
|
- if not filename:
|
|
|
|
- log.info('[process_segment] ad file missing')
|
|
|
|
- return
|
|
|
|
|
|
+ filenames = []
|
|
|
|
+ for i in item["elementos"]:
|
|
|
|
+ filename, md5hash = cloud_download(ad_key=i["anuncio"])
|
|
|
|
+ if filename:
|
|
|
|
+ filenames.append((filename, md5hash))
|
|
|
|
+ else:
|
|
|
|
+ log.info('[process_segment] ad file missing')
|
|
|
|
+
|
|
except Exception as err:
|
|
except Exception as err:
|
|
log.error('[process_segment] [{}] {}'.format(station, err))
|
|
log.error('[process_segment] [{}] {}'.format(station, err))
|
|
return
|
|
return
|
|
@@ -223,6 +222,7 @@ def process_segment(item, audios=None, calibration=None):
|
|
# 1.1 Calcular el número de segmentos requeridos
|
|
# 1.1 Calcular el número de segmentos requeridos
|
|
# de acuerdo a la duración total del audio.
|
|
# de acuerdo a la duración total del audio.
|
|
try:
|
|
try:
|
|
|
|
+ filename, md5hash = filenames[0]
|
|
audio = mutagen.mp3.MP3(filename)
|
|
audio = mutagen.mp3.MP3(filename)
|
|
audio_length = audio.info.length
|
|
audio_length = audio.info.length
|
|
|
|
|
|
@@ -241,7 +241,9 @@ def process_segment(item, audios=None, calibration=None):
|
|
|
|
|
|
dejavu = Dejavu({"database_type": "mem"})
|
|
dejavu = Dejavu({"database_type": "mem"})
|
|
try:
|
|
try:
|
|
- dejavu.fingerprint_file(filename)
|
|
|
|
|
|
+ for i in filenames:
|
|
|
|
+ filename = i[0]
|
|
|
|
+ dejavu.fingerprint_file(filename)
|
|
except Exception as ex:
|
|
except Exception as ex:
|
|
log.error('[process_segment] cannot fingerprint: {}'.format(ex))
|
|
log.error('[process_segment] cannot fingerprint: {}'.format(ex))
|
|
|
|
|
|
@@ -261,6 +263,7 @@ def process_segment(item, audios=None, calibration=None):
|
|
# 2. Read the list of files from local database
|
|
# 2. Read the list of files from local database
|
|
audios_counter = 0
|
|
audios_counter = 0
|
|
results = []
|
|
results = []
|
|
|
|
+ v = []
|
|
|
|
|
|
for path, name, ts in audios_iterable:
|
|
for path, name, ts in audios_iterable:
|
|
short_path = os.path.join(station, name)
|
|
short_path = os.path.join(station, name)
|
|
@@ -271,21 +274,27 @@ def process_segment(item, audios=None, calibration=None):
|
|
download_file(path)
|
|
download_file(path)
|
|
|
|
|
|
try:
|
|
try:
|
|
- for match in dejavu.recognize(recognizer, path, segment_size,
|
|
|
|
- ads_filter=[md5hash]):
|
|
|
|
|
|
+ for match in dejavu.recognize(recognizer, path, segment_size):
|
|
|
|
+ name = None
|
|
|
|
+ try:
|
|
|
|
+ name = match['name']
|
|
|
|
+ except KeyError:
|
|
|
|
+ pass
|
|
|
|
+
|
|
results.append({
|
|
results.append({
|
|
'confidence': match['confidence'],
|
|
'confidence': match['confidence'],
|
|
'timestamp': ts,
|
|
'timestamp': ts,
|
|
- 'offset': match['offset']
|
|
|
|
|
|
+ 'offset': match['offset'],
|
|
|
|
+ 'name': name
|
|
})
|
|
})
|
|
values.append(str(match['confidence']))
|
|
values.append(str(match['confidence']))
|
|
|
|
|
|
ts += match['length'] / 1000
|
|
ts += match['length'] / 1000
|
|
|
|
|
|
- log.info('[process_segment] [{3}] {2} {0}) {1}'.format(
|
|
|
|
|
|
+ v.append(','.join(values))
|
|
|
|
+ log.info('[process_segment] [{2}] {0}) {1}'.format(
|
|
os.path.split(path)[-1],
|
|
os.path.split(path)[-1],
|
|
','.join(values),
|
|
','.join(values),
|
|
- item['ad'],
|
|
|
|
station,
|
|
station,
|
|
))
|
|
))
|
|
|
|
|
|
@@ -293,17 +302,12 @@ def process_segment(item, audios=None, calibration=None):
|
|
log.error('[process_segment] {}'.format(ex))
|
|
log.error('[process_segment] {}'.format(ex))
|
|
|
|
|
|
try:
|
|
try:
|
|
- response = client.put_schedule_results(
|
|
|
|
- item['schedule'],
|
|
|
|
- item['id'],
|
|
|
|
- None, # TODO: send results again
|
|
|
|
- found=find_repetitions(results,
|
|
|
|
- segments_needed=segments_needed,
|
|
|
|
- calibration=calibration,
|
|
|
|
- ),
|
|
|
|
- missing_files=(12 - audios_counter) \
|
|
|
|
- if audios_counter < 12 else 0
|
|
|
|
- )
|
|
|
|
|
|
+ for i in item["elementos"]:
|
|
|
|
+ r = [result for result in results if result["name"] == i["anuncio"]]
|
|
|
|
+ i['encontrados'] = find_repetitions(r, segments_needed=segments_needed, calibration=calibration,)
|
|
|
|
+
|
|
|
|
+ item["archivos_perdidos"] = (12 - audios_counter) if audios_counter < 12 else 0
|
|
|
|
+ response = send_results(item)
|
|
log.info('[{}] API response: {}'.format(station, response))
|
|
log.info('[{}] API response: {}'.format(station, response))
|
|
except ConnectionError as ex:
|
|
except ConnectionError as ex:
|
|
log.error('[process_segment] {}'.format(str(ex)))
|
|
log.error('[process_segment] {}'.format(str(ex)))
|
|
@@ -370,7 +374,8 @@ def find_repetitions(results, segments_needed=2, calibration=None):
|
|
expect_space = False
|
|
expect_space = False
|
|
|
|
|
|
if found_counter >= segments_needed:
|
|
if found_counter >= segments_needed:
|
|
- found.append(results[found_index]['timestamp'])
|
|
|
|
|
|
+ found_row = results[found_index]
|
|
|
|
+ found.append(found_row)
|
|
found_counter = 0
|
|
found_counter = 0
|
|
expect_space = True
|
|
expect_space = True
|
|
|
|
|
|
@@ -450,7 +455,8 @@ def cloud_download(ad_key=None):
|
|
if rc != 'mp3\n':
|
|
if rc != 'mp3\n':
|
|
subprocess.call(['mv', out_file, out_file + '.old'])
|
|
subprocess.call(['mv', out_file, out_file + '.old'])
|
|
subprocess.call(
|
|
subprocess.call(
|
|
- ['ffmpeg', '-hide_banner', '-loglevel', 'panic', '-i', out_file + '.old', '-f', 'mp3', out_file])
|
|
|
|
|
|
+ ['ffmpeg', '-hide_banner', '-loglevel', 'panic', '-i', out_file + '.old', '-codec:a', 'libmp3lame',
|
|
|
|
+ '-qscale:a', '2', '-f', 'mp3', out_file])
|
|
subprocess.call(['rm', '-rf', out_file + '.old'])
|
|
subprocess.call(['rm', '-rf', out_file + '.old'])
|
|
cloud_cache[ad_key] = tp
|
|
cloud_cache[ad_key] = tp
|
|
return tp
|
|
return tp
|
|
@@ -469,6 +475,26 @@ def download_file(file_path=None):
|
|
cursor.close()
|
|
cursor.close()
|
|
|
|
|
|
|
|
|
|
|
|
+def get_pendings():
|
|
|
|
+ url = 'https://api.fourier.audio/v1/calendario/pendiente?id={}'.format(config['device_id'], )
|
|
|
|
+ headers = {
|
|
|
|
+ 'Authorization': 'Bearer {}'.format(config['apiSecret'], )
|
|
|
|
+ }
|
|
|
|
+ response = requests.get(url, headers=headers)
|
|
|
|
+ return response.json()
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def send_results(item):
|
|
|
|
+ url = 'https://api.fourier.audio/v1/calendario/resultado'
|
|
|
|
+ # url = "http://requestbin.net/r/1bcyvg91"
|
|
|
|
+ headers = {
|
|
|
|
+ 'Authorization': 'Bearer {}'.format(config['apiSecret'], )
|
|
|
|
+ }
|
|
|
|
+ log.info('url: {}'.format(url))
|
|
|
|
+ response = requests.post(url, json=item, headers=headers)
|
|
|
|
+ return response
|
|
|
|
+
|
|
|
|
+
|
|
app = setup_endpoint(queue=queue)
|
|
app = setup_endpoint(queue=queue)
|
|
loop = IOLoop.current()
|
|
loop = IOLoop.current()
|
|
loop.add_callback(feed_queue)
|
|
loop.add_callback(feed_queue)
|