ondemand.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from __future__ import print_function, absolute_import
  4. import warnings
  5. warnings.simplefilter("ignore", UserWarning)
  6. import os
  7. import sys
  8. import json
  9. import time
  10. import random
  11. import signal
  12. import logging as log
  13. from urllib.parse import quote as urlquote
  14. import requests
  15. from requests.adapters import HTTPAdapter, Retry
  16. from boxconfig import parse_config
  17. from dejavu import Dejavu
  18. from dejavu.recognize import FilePerSecondRecognizer
  19. # =========================
  20. # Configuración general
  21. # =========================
  22. PATH = '/tmp'
  23. API_NEXT = 'https://api.metrico.fourier.audio/comparacion/pendiente.json'
  24. API_POST = 'https://api.metrico.fourier.audio/comparacion/resultado.json'
  25. DEFAULT_TIMEOUT = 10
  26. SLEEP_MIN, SLEEP_MAX = 0.8, 2.5 # backoff base cuando no hay trabajo
  27. ERR_MIN, ERR_MAX = 2.0, 6.0 # backoff cuando hay error
  28. RUNNING = True
  29. log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
  30. config = parse_config()
  31. recognizer = FilePerSecondRecognizer
  32. # Sesión HTTP robusta (reintentos + backoff)
  33. session = requests.Session()
  34. retries = Retry(total=3, backoff_factor=0.5, status_forcelist=(429, 500, 502, 503, 504))
  35. session.mount("https://", HTTPAdapter(max_retries=retries))
  36. # =========================
  37. # Utilidades HTTP
  38. # =========================
  39. def http_get(url, **kw):
  40. return session.get(url, timeout=kw.pop("timeout", DEFAULT_TIMEOUT), **kw)
  41. def http_post(url, **kw):
  42. return session.post(url, timeout=kw.pop("timeout", DEFAULT_TIMEOUT), **kw)
  43. # =========================
  44. # API servidor
  45. # =========================
  46. def obt_siguiente_trabajo():
  47. r = http_get(API_NEXT)
  48. if r.status_code == 204:
  49. return None
  50. r.raise_for_status()
  51. data = r.json()
  52. if not data or len(data.get("elementos", [])) == 0:
  53. return None
  54. return data
  55. def enviar_resultados(payload):
  56. r = http_post(API_POST, json=payload)
  57. log.info("POST resultado %s %s", r.status_code, r.text[:300])
  58. r.raise_for_status()
  59. return r
  60. # =========================
  61. # Descargas
  62. # =========================
  63. def descargar_anuncio(ad_path, save_as=None):
  64. """
  65. Descarga de GCS. Guarda con save_as (tu campo 'anuncio' ya limpio) para llave 1:1.
  66. """
  67. filename = save_as or os.path.basename(ad_path)
  68. out_dir = os.path.join(PATH, 'ads')
  69. os.makedirs(out_dir, exist_ok=True)
  70. out_file = os.path.join(out_dir, filename)
  71. if os.path.isfile(out_file):
  72. return out_file
  73. cloud = f"https://storage.googleapis.com/{config['bucket']}"
  74. url = f"{cloud}/{urlquote(ad_path, safe='/')}" # URL-encode del path
  75. resp = http_get(url)
  76. if resp.status_code == 200:
  77. with open(out_file, "wb") as fp:
  78. fp.write(resp.content)
  79. return out_file
  80. log.warning("[Anuncio][%s] %s", resp.status_code, resp.text)
  81. return None
  82. def descargar_media(box, station, media):
  83. """
  84. Descarga desde Firebase Storage (según tu patrón original).
  85. """
  86. ref = f"{box}/{station}/{media}"
  87. file = os.path.basename(ref)
  88. out_dir = os.path.join(PATH, 'fourier', box, station)
  89. os.makedirs(out_dir, exist_ok=True)
  90. out_file = os.path.join(out_dir, file)
  91. if os.path.isfile(out_file):
  92. return out_file
  93. filename = ref.replace("/", "%2F").replace("+", "%2B")
  94. base = "https://firebasestorage.googleapis.com/v0/b/fourier-6e14d.appspot.com/o"
  95. url = f"{base}/{filename}?alt=media"
  96. resp = http_get(url)
  97. if resp.status_code == 200:
  98. with open(out_file, "wb") as fp:
  99. fp.write(resp.content)
  100. return out_file
  101. log.warning("[Media][%s] %s", resp.status_code, resp.text)
  102. return None
  103. # =========================
  104. # Core de procesamiento
  105. # =========================
  106. def procesar_trabajo(pendiente):
  107. """
  108. - Mapea por elem['anuncio'] (valor ya limpio sin 'anuncios/').
  109. - Huella media con Dejavu (name = timestamp base como string).
  110. - Reconoce cada anuncio y consolida hits en ventana de 30s.
  111. - Reporta resultados.
  112. """
  113. ciudad = pendiente['origen']
  114. estacion = pendiente['estacion']
  115. # --- Descarga anuncios
  116. anuncios_paths = []
  117. id_by_ad = {}
  118. for elem in pendiente.get("elementos", []):
  119. ad_key = elem["anuncio"] # limpio desde PostgreSQL (sin prefijo 'anuncios/')
  120. id_by_ad[ad_key] = elem["id"]
  121. ad_file = descargar_anuncio(elem["ruta"], save_as=ad_key)
  122. if ad_file:
  123. anuncios_paths.append(ad_file)
  124. # --- Descarga media
  125. media = []
  126. for m in pendiente.get("media", []):
  127. f = descargar_media(ciudad, estacion, m["ruta"])
  128. if f:
  129. try:
  130. ts = int(m["timestamp"])
  131. except Exception:
  132. ts = 0
  133. media.append((f, m["fecha"], ts))
  134. log.info("Comparación: media=%d anuncios=%d", len(media), len(anuncios_paths))
  135. djv = None
  136. resultados = {}
  137. aux = {}
  138. if len(media) > 0 and len(anuncios_paths) > 0:
  139. djv = Dejavu({"database_type": "mem"})
  140. # Fingerprint de cada archivo de media; usamos el timestamp como 'name'
  141. for ruta, fecha, ts in media:
  142. try:
  143. djv.fingerprint_file(ruta, str(ts))
  144. except Exception as ex:
  145. log.warning("Fingerprint error %s: %r", ruta, ex)
  146. # Reconocimiento por anuncio
  147. for ad_path in anuncios_paths:
  148. ad_key = os.path.basename(ad_path) # = elem['anuncio']
  149. hits = djv.recognize(FilePerSecondRecognizer, ad_path, 5) or []
  150. for hit in hits:
  151. if "id" not in hit:
  152. continue
  153. try:
  154. base_ts = int(hit["name"])
  155. except Exception:
  156. base_ts = 0
  157. ts_match = base_ts + int(hit.get("offset_seconds", 0))
  158. ad_id = id_by_ad.get(ad_key)
  159. if ad_id is None:
  160. log.warning("Sin id para anuncio %s", ad_key)
  161. continue
  162. entry = {
  163. "id": ad_id,
  164. "anuncio": ad_key,
  165. "fecha": hit["name"], # ts base como string
  166. "timestamp": ts_match, # entero
  167. "confianza": int(hit.get("confidence", 0)),
  168. "longitud": int(hit.get("length", 0)), # ms según lib
  169. "desfase_segundos": int(hit.get("offset_seconds", 0)),
  170. }
  171. resultados.setdefault(ad_id, []).append(entry)
  172. # Consolidación por ventana de 30s (suma confianza/longitud)
  173. for k, lista in resultados.items():
  174. lista.sort(key=lambda d: d['timestamp'])
  175. merged = []
  176. for i, it in enumerate(lista):
  177. if i == 0:
  178. merged.append(it)
  179. else:
  180. if it['timestamp'] - lista[i-1]['timestamp'] <= 30:
  181. merged[-1]['confianza'] += it['confianza']
  182. merged[-1]['longitud'] += it['longitud']
  183. else:
  184. merged.append(it)
  185. aux[k] = merged
  186. else:
  187. # TODO: Agregar más información del problema
  188. for i in pendiente.get('elementos', []):
  189. i['comentario'] = 'Problemas técnicos (media/anuncios vacíos)'
  190. # Ensamblado de respuesta
  191. for ad_id, lista in aux.items():
  192. for e in lista:
  193. for i in pendiente['elementos']:
  194. if i['id'] == e['id'] and i['anuncio'] == e['anuncio']:
  195. i.setdefault('encontrados', []).append({
  196. "fecha": e["fecha"],
  197. "anuncio": e["anuncio"],
  198. "longitud": int(e["longitud"] / 1000), # a segundos
  199. "confianza": e["confianza"],
  200. "timestamp": e["timestamp"],
  201. "desfase_segundos": e["desfase_segundos"]
  202. })
  203. i['comentario'] = ''
  204. break
  205. # No subimos media completa de vuelta
  206. pendiente["media"] = len(pendiente.get("media", []))
  207. enviar_resultados(pendiente)
  208. def _stop(*_):
  209. global RUNNING
  210. RUNNING = False
  211. def main_loop():
  212. worker_id = f"{os.uname().nodename}:{os.getpid()}" if hasattr(os, "uname") else f"pid:{os.getpid()}"
  213. log.info("Worker pull iniciado (%s)", worker_id)
  214. # Señales para detener con gracia
  215. signal.signal(signal.SIGTERM, _stop)
  216. signal.signal(signal.SIGINT, _stop)
  217. while RUNNING:
  218. try:
  219. job = obt_siguiente_trabajo()
  220. if not job:
  221. time.sleep(random.uniform(SLEEP_MIN, SLEEP_MAX))
  222. continue
  223. procesar_trabajo(job)
  224. except requests.exceptions.RequestException as net_ex:
  225. log.warning("Red/API: %r", net_ex)
  226. time.sleep(random.uniform(ERR_MIN, ERR_MAX))
  227. except Exception as ex:
  228. log.exception("Error procesando trabajo: %r", ex)
  229. time.sleep(random.uniform(ERR_MIN, ERR_MAX))
  230. log.info("Worker detenido con gracia")
  231. return 0
  232. if __name__ == '__main__':
  233. sys.exit(main_loop())