#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import print_function, absolute_import import warnings warnings.simplefilter("ignore", UserWarning) import os import sys import json import time import random import signal import logging as log from urllib.parse import quote as urlquote import requests from requests.adapters import HTTPAdapter, Retry from boxconfig import parse_config from dejavu import Dejavu from dejavu.recognize import FilePerSecondRecognizer # ========================= # Configuración general # ========================= PATH = '/tmp' API_NEXT = 'https://api.metrico.fourier.audio/comparacion/pendiente.json' API_POST = 'https://api.metrico.fourier.audio/comparacion/resultado.json' DEFAULT_TIMEOUT = 10 SLEEP_MIN, SLEEP_MAX = 0.8, 2.5 # backoff base cuando no hay trabajo ERR_MIN, ERR_MAX = 2.0, 6.0 # backoff cuando hay error RUNNING = True log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO) config = parse_config() recognizer = FilePerSecondRecognizer # Sesión HTTP robusta (reintentos + backoff) session = requests.Session() retries = Retry(total=3, backoff_factor=0.5, status_forcelist=(429, 500, 502, 503, 504)) session.mount("https://", HTTPAdapter(max_retries=retries)) # ========================= # Utilidades HTTP # ========================= def http_get(url, **kw): return session.get(url, timeout=kw.pop("timeout", DEFAULT_TIMEOUT), **kw) def http_post(url, **kw): return session.post(url, timeout=kw.pop("timeout", DEFAULT_TIMEOUT), **kw) # ========================= # API servidor # ========================= def obt_siguiente_trabajo(): r = http_get(API_NEXT) if r.status_code == 204: return None r.raise_for_status() data = r.json() if not data or len(data.get("elementos", [])) == 0: return None return data def enviar_resultados(payload): r = http_post(API_POST, json=payload) log.info("POST resultado %s %s", r.status_code, r.text[:300]) r.raise_for_status() return r # ========================= # Descargas # ========================= def descargar_anuncio(ad_path, save_as=None): """ Descarga de GCS. Guarda con save_as (tu campo 'anuncio' ya limpio) para llave 1:1. """ filename = save_as or os.path.basename(ad_path) out_dir = os.path.join(PATH, 'ads') os.makedirs(out_dir, exist_ok=True) out_file = os.path.join(out_dir, filename) if os.path.isfile(out_file): return out_file cloud = f"https://storage.googleapis.com/{config['bucket']}" url = f"{cloud}/{urlquote(ad_path, safe='/')}" # URL-encode del path resp = http_get(url) if resp.status_code == 200: with open(out_file, "wb") as fp: fp.write(resp.content) return out_file log.warning("[Anuncio][%s] %s", resp.status_code, resp.text) return None def descargar_media(box, station, media): """ Descarga desde Firebase Storage (según tu patrón original). """ ref = f"{box}/{station}/{media}" file = os.path.basename(ref) out_dir = os.path.join(PATH, 'fourier', box, station) os.makedirs(out_dir, exist_ok=True) out_file = os.path.join(out_dir, file) if os.path.isfile(out_file): return out_file filename = ref.replace("/", "%2F").replace("+", "%2B") base = "https://firebasestorage.googleapis.com/v0/b/fourier-6e14d.appspot.com/o" url = f"{base}/{filename}?alt=media" resp = http_get(url) if resp.status_code == 200: with open(out_file, "wb") as fp: fp.write(resp.content) return out_file log.warning("[Media][%s] %s", resp.status_code, resp.text) return None # ========================= # Core de procesamiento # ========================= def procesar_trabajo(pendiente): """ - Mapea por elem['anuncio'] (valor ya limpio sin 'anuncios/'). - Huella media con Dejavu (name = timestamp base como string). - Reconoce cada anuncio y consolida hits en ventana de 30s. - Reporta resultados. """ ciudad = pendiente['origen'] estacion = pendiente['estacion'] # --- Descarga anuncios anuncios_paths = [] id_by_ad = {} for elem in pendiente.get("elementos", []): ad_key = elem["anuncio"] # limpio desde PostgreSQL (sin prefijo 'anuncios/') id_by_ad[ad_key] = elem["id"] ad_file = descargar_anuncio(elem["ruta"], save_as=ad_key) if ad_file: anuncios_paths.append(ad_file) # --- Descarga media media = [] for m in pendiente.get("media", []): f = descargar_media(ciudad, estacion, m["ruta"]) if f: try: ts = int(m["timestamp"]) except Exception: ts = 0 media.append((f, m["fecha"], ts)) log.info("Comparación: media=%d anuncios=%d", len(media), len(anuncios_paths)) djv = None resultados = {} aux = {} if len(media) > 0 and len(anuncios_paths) > 0: djv = Dejavu({"database_type": "mem"}) # Fingerprint de cada archivo de media; usamos el timestamp como 'name' for ruta, fecha, ts in media: try: djv.fingerprint_file(ruta, str(ts)) except Exception as ex: log.warning("Fingerprint error %s: %r", ruta, ex) # Reconocimiento por anuncio for ad_path in anuncios_paths: ad_key = os.path.basename(ad_path) # = elem['anuncio'] hits = djv.recognize(FilePerSecondRecognizer, ad_path, 5) or [] for hit in hits: if "id" not in hit: continue try: base_ts = int(hit["name"]) except Exception: base_ts = 0 ts_match = base_ts + int(hit.get("offset_seconds", 0)) ad_id = id_by_ad.get(ad_key) if ad_id is None: log.warning("Sin id para anuncio %s", ad_key) continue entry = { "id": ad_id, "anuncio": ad_key, "fecha": hit["name"], # ts base como string "timestamp": ts_match, # entero "confianza": int(hit.get("confidence", 0)), "longitud": int(hit.get("length", 0)), # ms según lib "desfase_segundos": int(hit.get("offset_seconds", 0)), } resultados.setdefault(ad_id, []).append(entry) # Consolidación por ventana de 30s (suma confianza/longitud) for k, lista in resultados.items(): lista.sort(key=lambda d: d['timestamp']) merged = [] for i, it in enumerate(lista): if i == 0: merged.append(it) else: if it['timestamp'] - lista[i-1]['timestamp'] <= 30: merged[-1]['confianza'] += it['confianza'] merged[-1]['longitud'] += it['longitud'] else: merged.append(it) aux[k] = merged else: # TODO: Agregar más información del problema for i in pendiente.get('elementos', []): i['comentario'] = 'Problemas técnicos (media/anuncios vacíos)' # Ensamblado de respuesta for ad_id, lista in aux.items(): for e in lista: for i in pendiente['elementos']: if i['id'] == e['id'] and i['anuncio'] == e['anuncio']: i.setdefault('encontrados', []).append({ "fecha": e["fecha"], "anuncio": e["anuncio"], "longitud": int(e["longitud"] / 1000), # a segundos "confianza": e["confianza"], "timestamp": e["timestamp"], "desfase_segundos": e["desfase_segundos"] }) i['comentario'] = '' break # No subimos media completa de vuelta pendiente["media"] = len(pendiente.get("media", [])) enviar_resultados(pendiente) def _stop(*_): global RUNNING RUNNING = False def main_loop(): worker_id = f"{os.uname().nodename}:{os.getpid()}" if hasattr(os, "uname") else f"pid:{os.getpid()}" log.info("Worker pull iniciado (%s)", worker_id) # Señales para detener con gracia signal.signal(signal.SIGTERM, _stop) signal.signal(signal.SIGINT, _stop) while RUNNING: try: job = obt_siguiente_trabajo() if not job: time.sleep(random.uniform(SLEEP_MIN, SLEEP_MAX)) continue procesar_trabajo(job) except requests.exceptions.RequestException as net_ex: log.warning("Red/API: %r", net_ex) time.sleep(random.uniform(ERR_MIN, ERR_MAX)) except Exception as ex: log.exception("Error procesando trabajo: %r", ex) time.sleep(random.uniform(ERR_MIN, ERR_MAX)) log.info("Worker detenido con gracia") return 0 if __name__ == '__main__': sys.exit(main_loop())