from __future__ import print_function, absolute_import import warnings warnings.simplefilter("ignore", UserWarning) from tornado.ioloop import IOLoop from boxconfig import parse_config from dejavu.recognize import FilePerSecondRecognizer from dejavu import Dejavu from endpoint import setup_endpoint import logging as log import requests import json import time import os from queue import Queue, Empty log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO) PATH = '/tmp' AHEAD_TIME_AUDIO_TOLERANCE = 2 # second MAX_SEGMENT_THREADS = 4 THRESHOLD = 10 SEGMENTS_TOLERANCE_RATE = 0.6 FALL_TOLERANCE_SEGMENTS = 1 THRESHOLD_FIXED = 1 THRESHOLD_AVERAGE = 2 QUEUE_SINGLE = 1 config = parse_config() queue = Queue() cloud_base_url = 'https://storage.googleapis.com/{}' \ .format(config['bucket']) recognizer = FilePerSecondRecognizer threshold_mode = THRESHOLD_FIXED def obt_siguiente_trabajo(): url = 'https://api.fourier.audio/na/calendario/pendiente?id=%s' % (config['device_id'],) response = requests.get(url) return response.json() def descargar_anuncio(ad_path): anuncio = os.path.basename(ad_path) path = os.path.join(PATH, 'ads') os.makedirs(path, exist_ok=True) ruta_anuncio = os.path.join(path, anuncio) if os.path.isfile(ruta_anuncio): return ruta_anuncio url = '{}/{}'.format(cloud_base_url, ad_path) response = requests.get(url) # TODO: Agregar alerta cuando la respuesta no sea 200 if response.status_code == 200: with open(ruta_anuncio, "wb") as fp: fp.write(response.content) return ruta_anuncio else: log.info("Error al descargar") log.info(response) return None def descargar_media(box, station, media): ref = '{}/{}/{}'.format(box, station, media) file = os.path.basename(ref) path = os.path.join(PATH, 'fourier', box, station) os.makedirs(path, exist_ok=True) out_file = os.path.join(path, file) if os.path.isfile(out_file): return out_file filename = ref.replace("/","%2F") \ .replace("+","%2B") url = '{}/{}'.format(cloud_base_url, filename) response = requests.get(url) if response.status_code == 200: with open(out_file, "wb") as fp: fp.write(response.content) return out_file else: log.info("Error al descargar") log.info(response) return None def obt_calibracion(calibracion): default = { 'threshold': 12, 'tolerance': 0.8, 'fallTolerance': 1, 'segmentSize': 5, } if 'threshold' in calibracion: default['threshold'] = calibracion['threshold'] if 'tolerance' in calibracion: default['tolerance'] = calibracion['tolerance'] if 'segmentSize' in calibracion: default['segmentSize'] = calibracion['segmentSize'] if 'fallTolerance' in calibracion: default['fallTolerance'] = calibracion['fallTolerance'] return default def enviar_resultados(trabajo): url = 'https://api.fourier.audio/na/calendario/resultado' response = requests.post(url, json=trabajo) return response def llenar_pila(): """ Search for pending scheduled work in server and add them to a memory queue. """ try: response = obt_siguiente_trabajo() if len(response["elementos"]) > 0: queue.put(response) if queue.qsize() > 0: loop.add_callback(procesar_siguiente_pila) else: loop.add_timeout(time.time() + 30, llenar_pila) except Exception as ex: """ Errores desconocidos """ log.error('[feed_queue] {}'.format(ex)) loop.add_timeout(time.time() + 60, llenar_pila) raise ex def procesar_siguiente_pila(): """ Try to the next item in a queue and start processing it accordingly. If success, repeat the function or go to feed if no more items. """ try: item = queue.get(False) procesar_trabajo(item) loop.add_callback(procesar_siguiente_pila) except Empty: loop.add_callback(llenar_pila) except Exception as ex: log.error(ex) loop.add_callback(procesar_siguiente_pila) def procesar_trabajo(pendiente): ciudad = pendiente['origen'] estacion = pendiente['estacion'] # Descarga de anuncios try: anuncios = [] id_by_ad = {} item_ids = [] for i in pendiente["elementos"]: id_by_ad[i['anuncio']] = i['id'] if i['id'] not in item_ids: item_ids.append(i['id']) anuncio = descargar_anuncio(i["ruta"]) if anuncio is not None: anuncios.append(anuncio) except Exception as err: log.info('[process_segment] [{}] {}'.format(estacion, err)) # Descarga de media try: media = [] for i in pendiente["media"]: archivo = descargar_media(ciudad, estacion, i["ruta"]) if archivo is not None: media.append((archivo, i["fecha"], i["timestamp"])) except Exception as err: log.info(err) dejavu = None resultados = {} try: dejavu = Dejavu({"database_type": "mem"}) try: x = 0 for ruta, fecha, ts in media: log.info("Huellando %s" % (ruta,)) dejavu.fingerprint_file(ruta, ts) except Exception as ex: log.info(ex) for anuncio in anuncios: log.info("Buscando anuncio %s" % (anuncio,)) for i in dejavu.recognize(recognizer, anuncio, 5): if not "id" in i: continue if i["confidence"] < 50: continue obj = i obj["match_time"] = None nombre_anuncio = os.path.split(anuncio)[-1] id = id_by_ad[nombre_anuncio] dict = { "id": id, "anuncio": anuncio, "fecha": obj["name"], "timestamp": obj["name"] + int(obj['offset_seconds']), "confianza": obj["confidence"], "longitud": obj["length"], "desfase_segundos": obj["offset_seconds"] } if i["id"] in resultados.keys(): resultados[i["id"]]["longitud"] = resultados[i["id"]]["longitud"] + dict["longitud"] resultados[i["id"]]["confianza"] = resultados[i["id"]]["confianza"] + dict["confianza"] continue resultados[i["id"]] = dict for id in resultados: e = resultados[id] for i in pendiente['elementos']: anuncio = e['anuncio'].replace('/tmp/ads/', '') if i['id'] == id and i['anuncio'] == anuncio: if 'encontrados' not in i: i['encontrados'] = [] i['encontrados'].append(e) break enviar_resultados(pendiente) except Exception as ex: log.info(ex) app = setup_endpoint(queue=queue) loop = IOLoop.current() loop.add_callback(llenar_pila) if __name__ == '__main__': try: log.info('Starting ondemand service') loop.start() except KeyboardInterrupt: log.error('Process killed')