123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- from __future__ import print_function, absolute_import
- import warnings
- warnings.simplefilter("ignore", UserWarning)
- from tornado.ioloop import IOLoop
- from boxconfig import parse_config
- from dejavu.recognize import FilePerSecondRecognizer
- from dejavu import Dejavu
- from endpoint import setup_endpoint
- import logging as log
- import requests
- import json
- import time
- import os
- from queue import Queue, Empty
- log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
- PATH = '/tmp'
- AHEAD_TIME_AUDIO_TOLERANCE = 2 # second
- MAX_SEGMENT_THREADS = 4
- THRESHOLD = 10
- SEGMENTS_TOLERANCE_RATE = 0.6
- FALL_TOLERANCE_SEGMENTS = 1
- THRESHOLD_FIXED = 1
- THRESHOLD_AVERAGE = 2
- QUEUE_SINGLE = 1
- config = parse_config()
- queue = Queue()
- cloud_base_url = 'https://storage.googleapis.com/{}' \
- .format(config['bucket'])
- recognizer = FilePerSecondRecognizer
- threshold_mode = THRESHOLD_FIXED
- def obt_siguiente_trabajo():
- url = 'https://api.fourier.audio/na/calendario/pendiente?id=%s' % (config['device_id'],)
- response = requests.get(url)
- return response.json()
- def descargar_anuncio(ad_path):
- anuncio = os.path.basename(ad_path)
- path = os.path.join(PATH, 'ads')
- os.makedirs(path, exist_ok=True)
- ruta_anuncio = os.path.join(path, anuncio)
- if os.path.isfile(ruta_anuncio):
- return ruta_anuncio
- url = '{}/{}'.format(cloud_base_url, ad_path)
- response = requests.get(url)
- # TODO: Agregar alerta cuando la respuesta no sea 200
- if response.status_code == 200:
- with open(ruta_anuncio, "wb") as fp:
- fp.write(response.content)
- return ruta_anuncio
- else:
- log.info("Error al descargar")
- log.info(response)
- return None
- def descargar_media(box, station, media):
- ref = '{}/{}/{}'.format(box, station, media)
- file = os.path.basename(ref)
- path = os.path.join(PATH, 'fourier', box, station)
- os.makedirs(path, exist_ok=True)
- out_file = os.path.join(path, file)
- if os.path.isfile(out_file):
- return out_file
- filename = ref.replace("/","%2F") \
- .replace("+","%2B")
- url = '{}/{}'.format(cloud_base_url, filename)
- response = requests.get(url)
- if response.status_code == 200:
- with open(out_file, "wb") as fp:
- fp.write(response.content)
- return out_file
- else:
- log.info("Error al descargar")
- log.info(response)
- return None
- def obt_calibracion(calibracion):
- default = {
- 'threshold': 12,
- 'tolerance': 0.8,
- 'fallTolerance': 1,
- 'segmentSize': 5,
- }
- if 'threshold' in calibracion:
- default['threshold'] = calibracion['threshold']
- if 'tolerance' in calibracion:
- default['tolerance'] = calibracion['tolerance']
- if 'segmentSize' in calibracion:
- default['segmentSize'] = calibracion['segmentSize']
- if 'fallTolerance' in calibracion:
- default['fallTolerance'] = calibracion['fallTolerance']
- return default
- def enviar_resultados(trabajo):
- url = 'https://api.fourier.audio/na/calendario/resultado'
- response = requests.post(url, json=trabajo)
- return response
- def llenar_pila():
- """ Search for pending scheduled work in
- server and add them to a memory queue. """
- try:
- response = obt_siguiente_trabajo()
- if len(response["elementos"]) > 0:
- queue.put(response)
- if queue.qsize() > 0:
- loop.add_callback(procesar_siguiente_pila)
- else:
- loop.add_timeout(time.time() + 30, llenar_pila)
- except Exception as ex:
- """ Errores desconocidos """
- log.error('[feed_queue] {}'.format(ex))
- loop.add_timeout(time.time() + 60, llenar_pila)
- raise ex
- def procesar_siguiente_pila():
- """ Try to the next item in a queue and start
- processing it accordingly. If success, repeat
- the function or go to feed if no more items. """
- try:
- item = queue.get(False)
- procesar_trabajo(item)
- loop.add_callback(procesar_siguiente_pila)
- except Empty:
- loop.add_callback(llenar_pila)
- except Exception as ex:
- log.error(ex)
- loop.add_callback(procesar_siguiente_pila)
- def procesar_trabajo(pendiente):
- ciudad = pendiente['origen']
- estacion = pendiente['estacion']
- # Descarga de anuncios
- try:
- anuncios = []
- id_by_ad = {}
- item_ids = []
- for i in pendiente["elementos"]:
- id_by_ad[i['anuncio']] = i['id']
- if i['id'] not in item_ids:
- item_ids.append(i['id'])
- anuncio = descargar_anuncio(i["ruta"])
- if anuncio is not None:
- anuncios.append(anuncio)
- except Exception as err:
- log.info('[process_segment] [{}] {}'.format(estacion, err))
- # Descarga de media
- try:
- media = []
- for i in pendiente["media"]:
- archivo = descargar_media(ciudad, estacion, i["ruta"])
- if archivo is not None:
- media.append((archivo, i["fecha"], i["timestamp"]))
- except Exception as err:
- log.info(err)
- dejavu = None
- resultados = {}
- try:
- dejavu = Dejavu({"database_type": "mem"})
- try:
- x = 0
- for ruta, fecha, ts in media:
- log.info("Huellando %s" % (ruta,))
- dejavu.fingerprint_file(ruta, ts)
- except Exception as ex:
- log.info(ex)
- for anuncio in anuncios:
- log.info("Buscando anuncio %s" % (anuncio,))
- for i in dejavu.recognize(recognizer, anuncio, 5):
- if not "id" in i:
- continue
- if i["confidence"] < 50:
- continue
- obj = i
- obj["match_time"] = None
- nombre_anuncio = os.path.split(anuncio)[-1]
- id = id_by_ad[nombre_anuncio]
- dict = {
- "id": id,
- "anuncio": anuncio,
- "fecha": obj["name"],
- "timestamp": obj["name"] + int(obj['offset_seconds']),
- "confianza": obj["confidence"],
- "longitud": obj["length"],
- "desfase_segundos": obj["offset_seconds"]
- }
- if i["id"] in resultados.keys():
- resultados[i["id"]]["longitud"] = resultados[i["id"]]["longitud"] + dict["longitud"]
- resultados[i["id"]]["confianza"] = resultados[i["id"]]["confianza"] + dict["confianza"]
- continue
- resultados[i["id"]] = dict
- for id in resultados:
- e = resultados[id]
- for i in pendiente['elementos']:
- anuncio = e['anuncio'].replace('/tmp/ads/', '')
- if i['id'] == id and i['anuncio'] == anuncio:
- if 'encontrados' not in i:
- i['encontrados'] = []
- i['encontrados'].append(e)
- break
- enviar_resultados(pendiente)
- except Exception as ex:
- log.info(ex)
- app = setup_endpoint(queue=queue)
- loop = IOLoop.current()
- loop.add_callback(llenar_pila)
- if __name__ == '__main__':
- try:
- log.info('Starting ondemand service')
- loop.start()
- except KeyboardInterrupt:
- log.error('Process killed')
|