from __future__ import print_function, absolute_import import warnings warnings.simplefilter("ignore", UserWarning) from tornado.ioloop import IOLoop from boxconfig import parse_config from dejavu.recognize import FilePerSecondRecognizer from dejavu import Dejavu from endpoint import setup_endpoint import logging as log import requests import json import time import os from queue import Queue, Empty log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO) PATH = '/tmp' config = parse_config() queue = Queue() recognizer = FilePerSecondRecognizer def obt_siguiente_trabajo(): url = 'https://api.fourier.audio/na/calendario/pendiente?id=%s' % (config['device_id'],) response = requests.get(url) return response.json() def descargar_anuncio(ad_path): anuncio = os.path.basename(ad_path) path = os.path.join(PATH, 'ads') os.makedirs(path, exist_ok=True) ruta_anuncio = os.path.join(path, anuncio) if os.path.isfile(ruta_anuncio): return ruta_anuncio cloud_base_url = 'https://storage.googleapis.com/{}' \ .format(config['bucket']) url = '{}/{}'.format(cloud_base_url, ad_path) response = requests.get(url) # TODO: Agregar alerta cuando la respuesta no sea 200 if response.status_code == 200: with open(ruta_anuncio, "wb") as fp: fp.write(response.content) return ruta_anuncio else: log.info("[Anuncio][error] %s" % (response.text)) return None def descargar_media(box, station, media): ref = '{}/{}/{}'.format(box, station, media) file = os.path.basename(ref) path = os.path.join(PATH, 'fourier', box, station) os.makedirs(path, exist_ok=True) out_file = os.path.join(path, file) if os.path.isfile(out_file): return out_file filename = ref.replace("/","%2F") \ .replace("+","%2B") cloud_base_url = '%s%s' % ( 'https://firebasestorage.googleapis.com', '/v0/b/fourier-6e14d.appspot.com/o' ) url = '{}/{}?alt=media'.format(cloud_base_url, filename) response = requests.get(url) if response.status_code == 200: with open(out_file, "wb") as fp: fp.write(response.content) return out_file else: log.info("[Media][url] %s" % (response.text)) log.info("[Media][error] %s" % (response.text)) return None def enviar_resultados(trabajo): log.info('[Pendiente] %s' % (json.dumps(trabajo),)) url = 'https://api.fourier.audio/v1/calendario/resultado' # response = requests.post(url, json=trabajo) # log.info('[Response] %s' % (response.text)) # return response def llenar_pila(): """ Search for pending scheduled work in server and add them to a memory queue. """ try: response = obt_siguiente_trabajo() if len(response["elementos"]) > 0: queue.put(response) if queue.qsize() > 0: loop.add_callback(procesar_siguiente_pila) else: loop.add_timeout(time.time() + 30, llenar_pila) except Exception as ex: """ Errores desconocidos """ log.error('[feed_queue] {}'.format(ex)) loop.add_timeout(time.time() + 60, llenar_pila) raise ex def procesar_siguiente_pila(): """ Try to the next item in a queue and start processing it accordingly. If success, repeat the function or go to feed if no more items. """ try: item = queue.get(False) procesar_trabajo(item) loop.add_callback(procesar_siguiente_pila) except Empty: loop.add_callback(llenar_pila) except Exception as ex: log.error(ex) loop.add_callback(procesar_siguiente_pila) def procesar_trabajo(pendiente): ciudad = pendiente['origen'] estacion = pendiente['estacion'] # Descarga de anuncios log.info("Descargando anuncios") try: anuncios = [] id_by_ad = {} item_ids = [] for i in pendiente["elementos"]: id_by_ad[i['anuncio']] = i['id'] if i['id'] not in item_ids: item_ids.append(i['id']) anuncio = descargar_anuncio(i["ruta"]) if anuncio is not None: log.info("Listo %s" % (i['ruta'],)) anuncios.append(anuncio) except Exception as err: log.info('[process_segment] [{}] {}'.format(estacion, err)) # Descarga de media log.info("Descargando media") try: media = [] for i in pendiente["media"]: archivo = descargar_media(ciudad, estacion, i["ruta"]) if archivo is not None: log.info("Listo %s %s %s" % (ciudad, estacion, i['ruta'],)) media.append((archivo, i["fecha"], i["timestamp"])) except Exception as err: log.info(err) if len(media) == 0 or len(anuncio) == 0: log.info("No hay media o anuncios para comparar") return dejavu = None resultados = {} try: dejavu = Dejavu({"database_type": "mem"}) try: x = 0 for ruta, fecha, ts in media: log.info("Huellando %s" % (ruta,)) dejavu.fingerprint_file(ruta, ts) except Exception as ex: log.info(ex) for anuncio in anuncios: log.info("Buscando anuncio %s" % (anuncio,)) for i in dejavu.recognize(recognizer, anuncio, 10): if not "id" in i: continue if i["confidence"] < 35: continue obj = i obj["match_time"] = None nombre_anuncio = os.path.split(anuncio)[-1] id = id_by_ad[nombre_anuncio] dict = { "id": id, "anuncio": anuncio, "fecha": obj["name"], "timestamp": obj["name"] + int(obj['offset_seconds']), "confianza": obj["confidence"], "longitud": obj["length"], "desfase_segundos": obj["offset_seconds"] } if id in resultados.keys(): resultados[id]["longitud"] += dict["longitud"] resultados[id]["confianza"] += dict["confianza"] continue resultados[id] = dict for id in resultados: e = resultados[id] for i in pendiente['elementos']: anuncio = e['anuncio'].replace('/tmp/ads/', '') if i['id'] == e['id'] and i['anuncio'] == anuncio: if 'encontrados' not in i: i['encontrados'] = [] i['encontrados'].append(e) break log.info("[Resultado] %s" % (json.dumps(resultados))) enviar_resultados(pendiente) except Exception as ex: log.info(ex) app = setup_endpoint(queue=queue) loop = IOLoop.current() loop.add_callback(llenar_pila) if __name__ == '__main__': try: log.info('Starting ondemand service') loop.start() except KeyboardInterrupt: log.error('Process killed')