140 Commits 4b1faaa241 ... 0a915ce7b2

Author SHA1 Message Date
  Hugo Quijada 0a915ce7b2 Merge branch 'docker' of git.miralo.xyz:AudioValid/fourier-ondemand into docker 1 week ago
  Hugo Quijada a2097e578e Actualización de la comparación 1 week ago
  Hugo Quijada d0ec67079e Corrección de url 1 month ago
  Hugo Quijada d636278048 Obtener trabajo 1 month ago
  Hugo 557b958454 segmento y confianza fijo 3 years ago
  Hugo 9506f2b052 Correccion 3 years ago
  Hugo eb80fb08c4 Ahora no hay limite inferior para los resultados 3 years ago
  Hugo c2fbae068d Confianza variable 3 years ago
  Hugo b000d32ffb Correccion de la condicion para buscar anuncios 4 years ago
  Hugo 9bdb4906d3 Merge branch 'docker' of git.miralo.xyz:AudioValid/fourier-ondemand into docker 4 years ago
  Hugo 82723c9d63 Comparacion sin media 4 years ago
  Hugo 46d7cb361b Dockerignore y notebooks 4 years ago
  Hugo 10d2d6b895 Tamaño de segmento configurable 4 years ago
  Hugo 17885041a7 Filtrado de informacion que se envia al servidor 4 years ago
  Hugo 6c98908426 Guardar resultado de comparacion 4 years ago
  Hugo 94003ca8b8 Prueba con tamaño de segmento de 10 segundos 4 years ago
  Hugo fa1c5ca739 Correccion de url de resultados 4 years ago
  Hugo b484b35896 Estatus de la respuesta 4 years ago
  Hugo d44b891cdf Correccion de envio de resultados 4 years ago
  Hugo 76ab3dbde7 Correccion de url 4 years ago
  Hugo 315f62ce6d Imprimir resultados 4 years ago
  Hugo 550cbc831d Correccion del json 4 years ago
  Hugo 3e3a2d92c1 Correccion de resultados 4 years ago
  Hugo 1c7b37a44f Nueva version 4 years ago
  Hugo fa7837df36 Incluir la zona horaria en el contenedor 4 years ago
  Hugo 24293d7f34 Actualizar 'run.sh' 4 years ago
  Hugo 52fc0a1ec3 Actualizar 'run.sh' 4 years ago
  Hugo ba24292819 Actualizar 'run.sh' 4 years ago
  Hugo c7e62ef64e Actualizar 'run.sh' 4 years ago
  Hugo 0dc4c00765 Actualizar 'Dockerfile' 4 years ago
  Rolando Valenzuela 57cdc094c9 Merge branch 'docker' of https://git.miralo.xyz/AudioValid/fourier-ondemand into docker 4 years ago
  Rolando Valenzuela e725668981 Cambio a python 3 en dockerfile 4 years ago
  Rolando Valenzuela 2790fadd29 Actualización entrypoint 4 years ago
  Rolando Valenzuela 3f511f4fff Arreglo de variables 4 years ago
  Rolando Valenzuela f5aa1b4360 Dependencias para ejecutar en entorno docker 4 years ago
  Hugo 4c75a7fb88 No usar código para encontrar valores altos 5 years ago
  Fourier 5b55d8e1c3 Cambio en versión de google-auth 5 years ago
  hquijada f4c4e2f92e Nueva comparacion 5 years ago
  hquijada f1c8fbbde0 Arreglado la busqueda con muchas versiones de un audio 5 years ago
  Desarrollo 7e8331c118 Desarrollo 5 years ago
  Desarrollo f7e28f6f69 Correccion de versiones para python 2 5 years ago
  Desarrollo 408c3b005b Versiones de librerías para python 2 5 years ago
  Hugo 64d63ed75f Merge branch 'master' of git.miralo.xyz:AudioValid/fourier-ondemand 5 years ago
  Hugo 47843c5290 Take file from local if exists 5 years ago
  Hugo 4bef0b21eb Fix indentation 5 years ago
  Hugo 11f1ced844 Validate elements to queue 5 years ago
  Hugo 6ee2cbc45e Release candidate 5 years ago
  Hugo e7166e53d3 Compare multiple ads 5 years ago
  Hugo 820c2f6827 Download file to compare 6 years ago
  Hugo 7ba3d16e47 Use single queue 6 years ago
  Hugo 0ccdfe8a21 Restart instance with each comparison 6 years ago
  Hugo Quijada a5e147dcad delete the log because it causing an error 6 years ago
  Hugo Quijada d7a5180478 add returncode to log 6 years ago
  Hugo Quijada a91c2e57c6 Coonvert add to mp3 when it is not 6 years ago
  Gamaliel Espinoza 72b4216a36 Fall tolereance segments are counted only if it's recovered 6 years ago
  Gamaliel Espinoza e13d6590ea More fixes 6 years ago
  Gamaliel Espinoza 548cc41313 Fixed exception name 6 years ago
  Gamaliel Espinoza d1e6de094d Ooops 6 years ago
  Gamaliel Espinoza e5991a6a68 Conflicto resuelto 6 years ago
  Gamaliel Espinoza e94ddc354f Except for CouldntDecodeError 6 years ago
  Gamaliel Espinoza 589fc05fc7 Version bump 6 years ago
  Gamaliel Espinoza ab635babb0 Fixed mp3 error log 6 years ago
  Gamaliel Espinoza Macedo 962f99254d rollback 6 years ago
  Gamaliel Espinoza Macedo ee587a93e1 rayos 6 years ago
  Gamaliel Espinoza Macedo 8be45e283f empty indexing 6 years ago
  Gamaliel Espinoza Macedo 29057cce8f Fuck 6 years ago
  Gamaliel Espinoza Macedo bfde0f4b89 Second threshold mode 6 years ago
  Gamaliel Espinoza Macedo 69c1b0e590 Awanta 6 years ago
  Gamaliel Espinoza 47812270e1 Tolerance 6 years ago
  Gamaliel Espinoza 1e1e58291e Path fixed 6 years ago
  Gamaliel Espinoza 129177592f Fixed log 6 years ago
  Gamaliel Espinoza d262669e10 Station 6 years ago
  Gamaliel Espinoza 2998df90a4 Imprved log 6 years ago
  Gamaliel Espinoza ef12945848 threshold and more sensitibity 6 years ago
  Gamaliel Espinoza Macedo e9f038b245 Threshold 6 years ago
  Gamaliel Espinoza Macedo 931d457b85 Better logging 6 years ago
  Gamaliel Espinoza Macedo 1a5102c761 THRESHOLD 6 years ago
  Gamaliel Espinoza Macedo e8c5f63d43 Yes 6 years ago
  Gamaliel Espinoza Macedo 5fae2abe8f Multiprocessing 6 years ago
  Gamaliel Espinoza Macedo 3de02bd767 Fixes and fixes 6 years ago
  Gamaliel Espinoza Macedo 9cea3c603a Reptition problems 6 years ago
  Gamaliel Espinoza Macedo 1351bd731a Target parameter fixed 6 years ago
  Gamaliel Espinoza Macedo 7d509a4472 More stuff about threaded segment processing 6 years ago
  Gamaliel Espinoza Macedo edc101f9b7 more logging 6 years ago
  Gamaliel Espinoza Macedo 83475eedb1 Loggin and stuff 6 years ago
  Gamaliel Espinoza da41ae42db Missing mutagen in depdencies 6 years ago
  Gamaliel Espinoza b2f5e9cb16 Directorio de base de datos custom 6 years ago
  Gamaliel Espinoza 4e99b20f1a Less than 1 segment protecion 6 years ago
  Gamaliel Espinoza 9bdef17eec Version bump 6 years ago
  Gamaliel Espinoza 33f06d4898 Repetitions needed 6 years ago
  Gamaliel Espinoza eec5249582 Version bump 7 years ago
  Gamaliel Espinoza 767b28ae16 Missing files 7 years ago
  Gamaliel Espinoza 16c997c6bd more dependencies 7 years ago
  Gamaliel Espinoza 594730ab52 added tornado as dependency 7 years ago
  Gamaliel Espinoza f9cb3e6f2c readme 7 years ago
  Gamaliel Espinoza bea537cb1e refreshed dependencies 7 years ago
  Gamaliel Espinoza 45aed1b7f0 fixed module name 7 years ago
  Gamaliel Espinoza 9fc61fc1ee binary added 7 years ago
  Gamaliel Espinoza a980a2034b absolute import in service 7 years ago
  Gamaliel Espinoza f390d72b03 restored endpoint in python 7 years ago
  Gamaliel Espinoza 10773f39ce tolerance compensation to the end of hour 7 years ago
  Gamaliel Espinoza 85104f2151 incremented tolerance to 2 seconds 7 years ago
  Gamaliel Espinoza 9844d85361 audios will be queried from database with 1 second of tolerance to past hour 7 years ago
  Gamaliel Espinoza 899a2a4f79 version bump 7 years ago
  Gamaliel Espinoza 90576eb210 fixed ad filter to prevent recognizing all audios fingerprinted in box 7 years ago
  Gamaliel Espinoza 5de96c62ad if a file is already downloaded will not be downloaded in the process of running until the service is restarted 7 years ago
  Gamaliel Espinoza a807709465 implemented repetitions 7 years ago
  Gamaliel Espinoza 0c60ba9233 recognizing 7 years ago
  Gamaliel Espinoza 2b9aa11e50 removed result reporting temporally 7 years ago
  Gamaliel Espinoza 064e6d7064 results print 7 years ago
  Gamaliel Espinoza 63b9c6359e results print 7 years ago
  Gamaliel Espinoza d9f4c7c4b3 print for server response 7 years ago
  Gamaliel Espinoza cca4ebf8d7 fixed 1 hour period of audios 7 years ago
  Gamaliel Espinoza 4b767ddb65 from date to date 7 years ago
  Gamaliel Espinoza ddd58fca9a files found print 7 years ago
  Gamaliel Espinoza 737052ab2c print of filename 7 years ago
  Gamaliel Espinoza 67cd3320db fixed length 7 years ago
  Gamaliel Espinoza b5d53aba95 Fixed confidence count 7 years ago
  Gamaliel Espinoza 7a59d68412 beta version bump 7 years ago
  Gamaliel Espinoza ee9e88e8dc removed unused code 7 years ago
  Gamaliel Espinoza 3923e2def2 requirementst.txr 7 years ago
  Gamaliel Espinoza c1246d10a3 fixed installer 7 years ago
  Gamaliel Espinoza 6ca33739e2 fixed installer 7 years ago
  Gamaliel Espinoza 0a94c2dc96 fixes 7 years ago
  Gamaliel Espinoza cf3f4b27b6 bump version 7 years ago
  Gamaliel Espinoza 816b638715 python 7 years ago
  Gamaliel Espinoza 5ac8227262 python 7 years ago
  Gamaliel Espinoza 92c99fb08d added unbuffered call 7 years ago
  Gamaliel Espinoza 543929df58 stable version 7 years ago
  Gamaliel Espinoza 5463c4aa01 Fixed moment require 7 years ago
  Gamaliel Espinoza 290c58955f started and ended recognizing 7 years ago
  Gamaliel Espinoza c27e1337ae stderr is redirected to stderr, it was stderr->stdout before 7 years ago
  Gamaliel Espinoza 1e0b04f41f Queue ordering and lenght for monitoring 7 years ago
  Gamaliel Espinoza a97e543189 fixed the return code and added better logging 7 years ago
  Gamaliel Espinoza 7524cb9050 stable version 7 years ago
  Gamaliel Espinoza 94cefdf8aa wrong package.json 7 years ago
  Gamaliel Espinoza 3a6a950675 missgin version 7 years ago
  Gamaliel Espinoza c4c5b6428d missing endpoint variable 7 years ago
  Gamaliel Espinoza 08d4d8faf5 monitoring support 7 years ago
  Gamaliel Espinoza 1a5f31ac7f Initial commit 7 years ago
1 changed files with 246 additions and 228 deletions
  1. 246 228
      ondemand/ondemand.py

+ 246 - 228
ondemand/ondemand.py

@@ -1,267 +1,285 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
 from __future__ import print_function, absolute_import
 import warnings
 warnings.simplefilter("ignore", UserWarning)
 
-from tornado.ioloop import IOLoop
-from boxconfig import parse_config
-from dejavu.recognize import FilePerSecondRecognizer
-from dejavu import Dejavu
-from endpoint import setup_endpoint
-import logging as log
-import requests
+import os
+import sys
 import json
 import time
-import os
+import random
+import signal
+import logging as log
+from urllib.parse import quote as urlquote
 
-from queue import Queue, Empty
+import requests
+from requests.adapters import HTTPAdapter, Retry
 
-log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
+from boxconfig import parse_config
+from dejavu import Dejavu
+from dejavu.recognize import FilePerSecondRecognizer
 
+# =========================
+# Configuración general
+# =========================
 PATH = '/tmp'
+API_NEXT = 'https://api.metrico.fourier.audio/comparacion/pendiente.json'
+API_POST = 'https://api.metrico.fourier.audio/comparacion/resultado.json'
+
+DEFAULT_TIMEOUT = 10
+SLEEP_MIN, SLEEP_MAX = 0.8, 2.5   # backoff base cuando no hay trabajo
+ERR_MIN, ERR_MAX   = 2.0, 6.0     # backoff cuando hay error
+RUNNING = True
+
+log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
+
+
 config = parse_config()
-queue = Queue()
 recognizer = FilePerSecondRecognizer
 
+# Sesión HTTP robusta (reintentos + backoff)
+session = requests.Session()
+retries = Retry(total=3, backoff_factor=0.5, status_forcelist=(429, 500, 502, 503, 504))
+session.mount("https://", HTTPAdapter(max_retries=retries))
 
-def obt_siguiente_trabajo():
-    url = 'https://api.metrico.fourier.audio/comparacion/pendiente.json'
-    response = requests.get(url)
-    return response.json()
-
-
-def descargar_anuncio(ad_path):
-    anuncio = os.path.basename(ad_path)
-    path = os.path.join(PATH, 'ads')
-    os.makedirs(path, exist_ok=True)
-    ruta_anuncio = os.path.join(path, anuncio)
-
-    if os.path.isfile(ruta_anuncio):
-        return ruta_anuncio
-    cloud_base_url = 'https://storage.googleapis.com/{}' \
-        .format(config['bucket'])
-    url = '{}/{}'.format(cloud_base_url, ad_path)
-    response = requests.get(url)
-
-    # TODO: Agregar alerta cuando la respuesta no sea 200
-    if response.status_code == 200:
-        with open(ruta_anuncio, "wb") as fp:
-            fp.write(response.content)
-            return ruta_anuncio
 
-    else:
-        log.info("[Anuncio][error] %s" % (response.text))
+# =========================
+# Utilidades HTTP
+# =========================
+def http_get(url, **kw):
+    return session.get(url, timeout=kw.pop("timeout", DEFAULT_TIMEOUT), **kw)
+
+def http_post(url, **kw):
+    return session.post(url, timeout=kw.pop("timeout", DEFAULT_TIMEOUT), **kw)
+
+
+# =========================
+# API servidor
+# =========================
+def obt_siguiente_trabajo():
+    r = http_get(API_NEXT)
+    if r.status_code == 204:
+        return None
+    r.raise_for_status()
+    data = r.json()
+    if not data or len(data.get("elementos", [])) == 0:
         return None
+    return data
+
+def enviar_resultados(payload):
+    r = http_post(API_POST, json=payload)
+    log.info("POST resultado %s %s", r.status_code, r.text[:300])
+    r.raise_for_status()
+    return r
+
+# =========================
+# Descargas
+# =========================
+def descargar_anuncio(ad_path, save_as=None):
+    """
+    Descarga de GCS. Guarda con save_as (tu campo 'anuncio' ya limpio) para llave 1:1.
+    """
+    filename = save_as or os.path.basename(ad_path)
+    out_dir = os.path.join(PATH, 'ads')
+    os.makedirs(out_dir, exist_ok=True)
+    out_file = os.path.join(out_dir, filename)
+
+    if os.path.isfile(out_file):
+        return out_file
+
+    cloud = f"https://storage.googleapis.com/{config['bucket']}"
+    url = f"{cloud}/{urlquote(ad_path, safe='/')}"  # URL-encode del path
+    resp = http_get(url)
+    if resp.status_code == 200:
+        with open(out_file, "wb") as fp:
+            fp.write(resp.content)
+        return out_file
+
+    log.warning("[Anuncio][%s] %s", resp.status_code, resp.text)
+    return None
 
 
 def descargar_media(box, station, media):
-    ref = '{}/{}/{}'.format(box, station, media)
+    """
+    Descarga desde Firebase Storage (según tu patrón original).
+    """
+    ref = f"{box}/{station}/{media}"
     file = os.path.basename(ref)
-    path = os.path.join(PATH, 'fourier', box, station)
-    os.makedirs(path, exist_ok=True)
-    out_file = os.path.join(path, file)
+    out_dir = os.path.join(PATH, 'fourier', box, station)
+    os.makedirs(out_dir, exist_ok=True)
+    out_file = os.path.join(out_dir, file)
 
     if os.path.isfile(out_file):
         return out_file
 
-    filename = ref.replace("/","%2F") \
-        .replace("+","%2B")
-    cloud_base_url = '%s%s' % (
-        'https://firebasestorage.googleapis.com',
-        '/v0/b/fourier-6e14d.appspot.com/o'
-    )
-    url = '{}/{}?alt=media'.format(cloud_base_url, filename)
-    response = requests.get(url)
-
-    if response.status_code == 200:
+    filename = ref.replace("/", "%2F").replace("+", "%2B")
+    base = "https://firebasestorage.googleapis.com/v0/b/fourier-6e14d.appspot.com/o"
+    url = f"{base}/{filename}?alt=media"
+    resp = http_get(url)
+    if resp.status_code == 200:
         with open(out_file, "wb") as fp:
-            fp.write(response.content)
-            return out_file
-    else:
-        log.info("[Media][url] %s" % (response.text))
-        log.info("[Media][error] %s" % (response.text))
-        return None
-
+            fp.write(resp.content)
+        return out_file
 
-def enviar_resultados(trabajo):
-    print('[Pendiente] %s' % (json.dumps(trabajo),))
-    url = 'https://api.metrico.fourier.audio/comparacion/resultado.json'
-    response = requests.post(url, json=trabajo)
-    print('[Response] %s' % (response.text))
-    return response
-
-
-def llenar_pila():
-    """ Search for pending scheduled work in
-    server and add them to a memory queue. """
-    try:
-        response = obt_siguiente_trabajo()
-        if len(response["elementos"]) > 0:
-            queue.put(response)
-
-        if queue.qsize() > 0:
-            loop.add_callback(procesar_siguiente_pila)
-        else:
-            loop.add_timeout(time.time() + 30, llenar_pila)
-
-    except Exception as ex:
-        """ Errores desconocidos """
-        log.error('[feed_queue] {}'.format(ex))
-        loop.add_timeout(time.time() + 60, llenar_pila)
-        raise ex
-
-
-def procesar_siguiente_pila():
-    """ Try to the next item in a queue and start
-    processing it accordingly. If success, repeat
-    the function or go to feed if no more items. """
-    try:
-        item = queue.get(False)
-        procesar_trabajo(item)
-        loop.add_callback(procesar_siguiente_pila)
-    except Empty:
-        loop.add_callback(llenar_pila)
-    except Exception as ex:
-        log.error(ex)
-        loop.add_callback(procesar_siguiente_pila)
+    log.warning("[Media][%s] %s", resp.status_code, resp.text)
+    return None
 
 
+# =========================
+# Core de procesamiento
+# =========================
 def procesar_trabajo(pendiente):
-    ciudad = pendiente['origen']
+    """
+    - Mapea por elem['anuncio'] (valor ya limpio sin 'anuncios/').
+    - Huella media con Dejavu (name = timestamp base como string).
+    - Reconoce cada anuncio y consolida hits en ventana de 30s.
+    - Reporta resultados.
+    """
+    ciudad   = pendiente['origen']
     estacion = pendiente['estacion']
-    confianza = 35
-    segmento = 5
-    #if "segmento" in pendiente:
-    #    segmento = int(pendiente["segmento"])
-
-    #if "confianza" in pendiente:
-    #    confianza = int(pendiente["confianza"])
-
-    # Descarga de anuncios
-    log.info("Descargando anuncios")
-    try:
-        anuncios = []
-        id_by_ad = {}
-        item_ids = []
-        for i in pendiente["elementos"]:
-            id_by_ad[i['anuncio']] = i['id']
-            if i['id'] not in item_ids:
-                item_ids.append(i['id'])
-
-            anuncio = descargar_anuncio(i["ruta"])
-            if anuncio is not None:
-                anuncios.append(anuncio)
-
-    except Exception as err:
-        log.info('[process_segment] [{}] {}'.format(estacion, err))
-
-    # Descarga de media
-    log.info("Descargando media")
-    try:
-        media = []
-        for i in pendiente["media"]:
-            archivo = descargar_media(ciudad, estacion, i["ruta"])
-            if archivo is not None:
-                media.append((archivo, i["fecha"], i["timestamp"]))
-
-    except Exception as err:
-        log.info(err)
-
-    log.info("Inicia la comparacion, tamaño de segmento %s" % (segmento,))
-    try:
-        dejavu = None
-        resultados = {}
-        aux = {}
-        if len(media) > 0 and len(anuncio) > 0:
-            dejavu = Dejavu({"database_type": "mem"})
+
+    # --- Descarga anuncios
+    anuncios_paths = []
+    id_by_ad = {}
+    for elem in pendiente.get("elementos", []):
+        ad_key = elem["anuncio"]  # limpio desde PostgreSQL (sin prefijo 'anuncios/')
+        id_by_ad[ad_key] = elem["id"]
+
+        ad_file = descargar_anuncio(elem["ruta"], save_as=ad_key)
+        if ad_file:
+            anuncios_paths.append(ad_file)
+
+    # --- Descarga media
+    media = []
+    for m in pendiente.get("media", []):
+        f = descargar_media(ciudad, estacion, m["ruta"])
+        if f:
+            try:
+                ts = int(m["timestamp"])
+            except Exception:
+                ts = 0
+            media.append((f, m["fecha"], ts))
+
+    log.info("Comparación: media=%d anuncios=%d", len(media), len(anuncios_paths))
+
+    djv = None
+    resultados = {}
+    aux = {}
+
+    if len(media) > 0 and len(anuncios_paths) > 0:
+        djv = Dejavu({"database_type": "mem"})
+
+        # Fingerprint de cada archivo de media; usamos el timestamp como 'name'
+        for ruta, fecha, ts in media:
             try:
-                x = 0
-                for ruta, fecha, ts in media:
-                    log.info("Huellando %s" % (ruta,))
-                    dejavu.fingerprint_file(ruta, ts)
+                djv.fingerprint_file(ruta, str(ts))
             except Exception as ex:
-                log.info(ex)
-
-            for anuncio in anuncios:
-                for i in dejavu.recognize(recognizer, anuncio, 5):
-                    if not "id" in i:
-                        continue
-
-                    nombre_anuncio = os.path.split(anuncio)[-1]
-                    id = id_by_ad[nombre_anuncio]
-                    if id not in resultados:
-                        resultados[id] = []
-
-                    obj = i
-                    obj["match_time"] = None
-                    dict = {
-                        "id": id,
-                        "anuncio": anuncio,
-                        "fecha": obj["name"],
-                        "timestamp": obj["name"] + int(obj['offset_seconds']),
-                        "confianza": obj["confidence"],
-                        "longitud": obj["length"],
-                        "desfase_segundos": obj["offset_seconds"]
-                    }
-                    resultados[id].append(dict)
-
-            for k in resultados.keys():
-                lista = sorted(resultados[k], key=lambda d: d['timestamp'])
-                lista_nueva = []
-                ult = None
-                for x in range(0, len(lista)):
-                    if x == 0:
-                        ult = x
-                        lista_nueva.append(lista[ult])
+                log.warning("Fingerprint error %s: %r", ruta, ex)
+
+        # Reconocimiento por anuncio
+        for ad_path in anuncios_paths:
+            ad_key = os.path.basename(ad_path)  # = elem['anuncio']
+            hits = djv.recognize(FilePerSecondRecognizer, ad_path, 5) or []
+            for hit in hits:
+                if "id" not in hit:
+                    continue
+                try:
+                    base_ts = int(hit["name"])
+                except Exception:
+                    base_ts = 0
+                ts_match = base_ts + int(hit.get("offset_seconds", 0))
+
+                ad_id = id_by_ad.get(ad_key)
+                if ad_id is None:
+                    log.warning("Sin id para anuncio %s", ad_key)
+                    continue
+
+                entry = {
+                    "id": ad_id,
+                    "anuncio": ad_key,
+                    "fecha": hit["name"],                      # ts base como string
+                    "timestamp": ts_match,                     # entero
+                    "confianza": int(hit.get("confidence", 0)),
+                    "longitud": int(hit.get("length", 0)),     # ms según lib
+                    "desfase_segundos": int(hit.get("offset_seconds", 0)),
+                }
+                resultados.setdefault(ad_id, []).append(entry)
+
+        # Consolidación por ventana de 30s (suma confianza/longitud)
+        for k, lista in resultados.items():
+            lista.sort(key=lambda d: d['timestamp'])
+            merged = []
+            for i, it in enumerate(lista):
+                if i == 0:
+                    merged.append(it)
+                else:
+                    if it['timestamp'] - lista[i-1]['timestamp'] <= 30:
+                        merged[-1]['confianza'] += it['confianza']
+                        merged[-1]['longitud']  += it['longitud']
                     else:
-                        dif = lista[x]['timestamp'] - lista[x - 1]['timestamp']
-                        if dif <= 30:
-                            lista_nueva[ult]['confianza'] = int(lista_nueva[ult]['confianza']) + int(lista[x]['confianza'])
-                            lista_nueva[ult]['longitud'] = int(lista_nueva[ult]['longitud']) +int(lista[x]['longitud'])
-                        else:
-                            lista_nueva.append(lista[x])
-                            ult = len(lista_nueva) - 1
+                        merged.append(it)
+            aux[k] = merged
 
-                aux[k] = lista_nueva
+    else:
+        # TODO: Agregar más información del problema
+        for i in pendiente.get('elementos', []):
+            i['comentario'] = 'Problemas técnicos (media/anuncios vacíos)'
 
-        else:
+    # Ensamblado de respuesta
+    for ad_id, lista in aux.items():
+        for e in lista:
             for i in pendiente['elementos']:
-                i['comentario'] = 'Problemas técnicos'
-
-        for id in aux:
-            for e in aux[id]:
-                for i in pendiente['elementos']:
+                if i['id'] == e['id'] and i['anuncio'] == e['anuncio']:
+                    i.setdefault('encontrados', []).append({
+                        "fecha": e["fecha"],
+                        "anuncio": e["anuncio"],
+                        "longitud": int(e["longitud"] / 1000),  # a segundos
+                        "confianza": e["confianza"],
+                        "timestamp": e["timestamp"],
+                        "desfase_segundos": e["desfase_segundos"]
+                    })
                     i['comentario'] = ''
-                    anuncio = e['anuncio'].replace('/tmp/ads/', '')
-                    if i['id'] == e['id'] and i['anuncio'] == anuncio:
-                        if 'encontrados' not in i:
-                            i['encontrados'] = []
-                        obj = {
-                            "fecha": e["fecha"],
-                            "anuncio": anuncio,
-                            "longitud": int(e["longitud"] / 1000),
-                            "confianza": e["confianza"],
-                            "timestamp": e["timestamp"],
-                            "desfase_segundos": e["desfase_segundos"]
-                        }
-                        i['encontrados'].append(obj)
-                        break
-
-        # log.info(json.dumps(extras))
-        log.info("[Resultado] %s" % (json.dumps(resultados)))
-        pendiente["media"] = None
-        enviar_resultados(pendiente)
-
-    except Exception as ex:
-        log.info(ex)
-
-
-app = setup_endpoint(queue=queue)
-loop = IOLoop.current()
-loop.add_callback(llenar_pila)
+                    break
+
+    # No subimos media completa de vuelta
+    pendiente["media"] = len(pendiente.get("media", [])) 
+    enviar_resultados(pendiente)
+
+
+def _stop(*_):
+    global RUNNING
+    RUNNING = False
+
+
+def main_loop():
+    worker_id = f"{os.uname().nodename}:{os.getpid()}" if hasattr(os, "uname") else f"pid:{os.getpid()}"
+    log.info("Worker pull iniciado (%s)", worker_id)
+
+    # Señales para detener con gracia
+    signal.signal(signal.SIGTERM, _stop)
+    signal.signal(signal.SIGINT,  _stop)
+
+    while RUNNING:
+        try:
+            job = obt_siguiente_trabajo()
+            if not job:
+                time.sleep(random.uniform(SLEEP_MIN, SLEEP_MAX))
+                continue
+
+            procesar_trabajo(job)
+
+        except requests.exceptions.RequestException as net_ex:
+            log.warning("Red/API: %r", net_ex)
+            time.sleep(random.uniform(ERR_MIN, ERR_MAX))
+        except Exception as ex:
+            log.exception("Error procesando trabajo: %r", ex)
+            time.sleep(random.uniform(ERR_MIN, ERR_MAX))
+
+    log.info("Worker detenido con gracia")
+    return 0
+
 
 if __name__ == '__main__':
-    try:
-        log.info('Starting ondemand service')
-        loop.start()
-    except KeyboardInterrupt:
-        log.error('Process killed')
+    sys.exit(main_loop())