123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from __future__ import print_function, absolute_import
- import warnings
- warnings.simplefilter("ignore", UserWarning)
- import os
- import sys
- import json
- import time
- import random
- import signal
- import logging as log
- from urllib.parse import quote as urlquote
- import requests
- from requests.adapters import HTTPAdapter, Retry
- from boxconfig import parse_config
- from dejavu import Dejavu
- from dejavu.recognize import FilePerSecondRecognizer
- # =========================
- # Configuración general
- # =========================
- PATH = '/tmp'
- API_NEXT = 'https://api.metrico.fourier.audio/comparacion/pendiente.json'
- API_POST = 'https://api.metrico.fourier.audio/comparacion/resultado.json'
- DEFAULT_TIMEOUT = 10
- SLEEP_MIN, SLEEP_MAX = 0.8, 2.5 # backoff base cuando no hay trabajo
- ERR_MIN, ERR_MAX = 2.0, 6.0 # backoff cuando hay error
- RUNNING = True
- log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
- config = parse_config()
- recognizer = FilePerSecondRecognizer
- # Sesión HTTP robusta (reintentos + backoff)
- session = requests.Session()
- retries = Retry(total=3, backoff_factor=0.5, status_forcelist=(429, 500, 502, 503, 504))
- session.mount("https://", HTTPAdapter(max_retries=retries))
- # =========================
- # Utilidades HTTP
- # =========================
- def http_get(url, **kw):
- return session.get(url, timeout=kw.pop("timeout", DEFAULT_TIMEOUT), **kw)
- def http_post(url, **kw):
- return session.post(url, timeout=kw.pop("timeout", DEFAULT_TIMEOUT), **kw)
- # =========================
- # API servidor
- # =========================
- def obt_siguiente_trabajo():
- r = http_get(API_NEXT)
- if r.status_code == 204:
- return None
- r.raise_for_status()
- data = r.json()
- if not data or len(data.get("elementos", [])) == 0:
- return None
- return data
- def enviar_resultados(payload):
- r = http_post(API_POST, json=payload)
- log.info("POST resultado %s %s", r.status_code, r.text[:300])
- r.raise_for_status()
- return r
- # =========================
- # Descargas
- # =========================
- def descargar_anuncio(ad_path, save_as=None):
- """
- Descarga de GCS. Guarda con save_as (tu campo 'anuncio' ya limpio) para llave 1:1.
- """
- filename = save_as or os.path.basename(ad_path)
- out_dir = os.path.join(PATH, 'ads')
- os.makedirs(out_dir, exist_ok=True)
- out_file = os.path.join(out_dir, filename)
- if os.path.isfile(out_file):
- return out_file
- cloud = f"https://storage.googleapis.com/{config['bucket']}"
- url = f"{cloud}/{urlquote(ad_path, safe='/')}" # URL-encode del path
- resp = http_get(url)
- if resp.status_code == 200:
- with open(out_file, "wb") as fp:
- fp.write(resp.content)
- return out_file
- log.warning("[Anuncio][%s] %s", resp.status_code, resp.text)
- return None
- def descargar_media(box, station, media):
- """
- Descarga desde Firebase Storage (según tu patrón original).
- """
- ref = f"{box}/{station}/{media}"
- file = os.path.basename(ref)
- out_dir = os.path.join(PATH, 'fourier', box, station)
- os.makedirs(out_dir, exist_ok=True)
- out_file = os.path.join(out_dir, file)
- if os.path.isfile(out_file):
- return out_file
- filename = ref.replace("/", "%2F").replace("+", "%2B")
- base = "https://firebasestorage.googleapis.com/v0/b/fourier-6e14d.appspot.com/o"
- url = f"{base}/{filename}?alt=media"
- resp = http_get(url)
- if resp.status_code == 200:
- with open(out_file, "wb") as fp:
- fp.write(resp.content)
- return out_file
- log.warning("[Media][%s] %s", resp.status_code, resp.text)
- return None
- # =========================
- # Core de procesamiento
- # =========================
- def procesar_trabajo(pendiente):
- """
- - Mapea por elem['anuncio'] (valor ya limpio sin 'anuncios/').
- - Huella media con Dejavu (name = timestamp base como string).
- - Reconoce cada anuncio y consolida hits en ventana de 30s.
- - Reporta resultados.
- """
- ciudad = pendiente['origen']
- estacion = pendiente['estacion']
- # --- Descarga anuncios
- anuncios_paths = []
- id_by_ad = {}
- for elem in pendiente.get("elementos", []):
- ad_key = elem["anuncio"] # limpio desde PostgreSQL (sin prefijo 'anuncios/')
- id_by_ad[ad_key] = elem["id"]
- ad_file = descargar_anuncio(elem["ruta"], save_as=ad_key)
- if ad_file:
- anuncios_paths.append(ad_file)
- # --- Descarga media
- media = []
- for m in pendiente.get("media", []):
- f = descargar_media(ciudad, estacion, m["ruta"])
- if f:
- try:
- ts = int(m["timestamp"])
- except Exception:
- ts = 0
- media.append((f, m["fecha"], ts))
- log.info("Comparación: media=%d anuncios=%d", len(media), len(anuncios_paths))
- djv = None
- resultados = {}
- aux = {}
- if len(media) > 0 and len(anuncios_paths) > 0:
- djv = Dejavu({"database_type": "mem"})
- # Fingerprint de cada archivo de media; usamos el timestamp como 'name'
- for ruta, fecha, ts in media:
- try:
- djv.fingerprint_file(ruta, str(ts))
- except Exception as ex:
- log.warning("Fingerprint error %s: %r", ruta, ex)
- # Reconocimiento por anuncio
- for ad_path in anuncios_paths:
- ad_key = os.path.basename(ad_path) # = elem['anuncio']
- hits = djv.recognize(FilePerSecondRecognizer, ad_path, 5) or []
- for hit in hits:
- if "id" not in hit:
- continue
- try:
- base_ts = int(hit["name"])
- except Exception:
- base_ts = 0
- ts_match = base_ts + int(hit.get("offset_seconds", 0))
- ad_id = id_by_ad.get(ad_key)
- if ad_id is None:
- log.warning("Sin id para anuncio %s", ad_key)
- continue
- entry = {
- "id": ad_id,
- "anuncio": ad_key,
- "fecha": hit["name"], # ts base como string
- "timestamp": ts_match, # entero
- "confianza": int(hit.get("confidence", 0)),
- "longitud": int(hit.get("length", 0)), # ms según lib
- "desfase_segundos": int(hit.get("offset_seconds", 0)),
- }
- resultados.setdefault(ad_id, []).append(entry)
- # Consolidación por ventana de 30s (suma confianza/longitud)
- for k, lista in resultados.items():
- lista.sort(key=lambda d: d['timestamp'])
- merged = []
- for i, it in enumerate(lista):
- if i == 0:
- merged.append(it)
- else:
- if it['timestamp'] - lista[i-1]['timestamp'] <= 30:
- merged[-1]['confianza'] += it['confianza']
- merged[-1]['longitud'] += it['longitud']
- else:
- merged.append(it)
- aux[k] = merged
- else:
- # TODO: Agregar más información del problema
- for i in pendiente.get('elementos', []):
- i['comentario'] = 'Problemas técnicos (media/anuncios vacíos)'
- # Ensamblado de respuesta
- for ad_id, lista in aux.items():
- for e in lista:
- for i in pendiente['elementos']:
- if i['id'] == e['id'] and i['anuncio'] == e['anuncio']:
- i.setdefault('encontrados', []).append({
- "fecha": e["fecha"],
- "anuncio": e["anuncio"],
- "longitud": int(e["longitud"] / 1000), # a segundos
- "confianza": e["confianza"],
- "timestamp": e["timestamp"],
- "desfase_segundos": e["desfase_segundos"]
- })
- i['comentario'] = ''
- break
- # No subimos media completa de vuelta
- pendiente["media"] = len(pendiente.get("media", []))
- enviar_resultados(pendiente)
- def _stop(*_):
- global RUNNING
- RUNNING = False
- def main_loop():
- worker_id = f"{os.uname().nodename}:{os.getpid()}" if hasattr(os, "uname") else f"pid:{os.getpid()}"
- log.info("Worker pull iniciado (%s)", worker_id)
- # Señales para detener con gracia
- signal.signal(signal.SIGTERM, _stop)
- signal.signal(signal.SIGINT, _stop)
- while RUNNING:
- try:
- job = obt_siguiente_trabajo()
- if not job:
- time.sleep(random.uniform(SLEEP_MIN, SLEEP_MAX))
- continue
- procesar_trabajo(job)
- except requests.exceptions.RequestException as net_ex:
- log.warning("Red/API: %r", net_ex)
- time.sleep(random.uniform(ERR_MIN, ERR_MAX))
- except Exception as ex:
- log.exception("Error procesando trabajo: %r", ex)
- time.sleep(random.uniform(ERR_MIN, ERR_MAX))
- log.info("Worker detenido con gracia")
- return 0
- if __name__ == '__main__':
- sys.exit(main_loop())
|