|
@@ -1,267 +1,285 @@
|
|
|
|
+#!/usr/bin/env python3
|
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
|
+
|
|
from __future__ import print_function, absolute_import
|
|
from __future__ import print_function, absolute_import
|
|
import warnings
|
|
import warnings
|
|
warnings.simplefilter("ignore", UserWarning)
|
|
warnings.simplefilter("ignore", UserWarning)
|
|
|
|
|
|
-from tornado.ioloop import IOLoop
|
|
|
|
-from boxconfig import parse_config
|
|
|
|
-from dejavu.recognize import FilePerSecondRecognizer
|
|
|
|
-from dejavu import Dejavu
|
|
|
|
-from endpoint import setup_endpoint
|
|
|
|
-import logging as log
|
|
|
|
-import requests
|
|
|
|
|
|
+import os
|
|
|
|
+import sys
|
|
import json
|
|
import json
|
|
import time
|
|
import time
|
|
-import os
|
|
|
|
|
|
+import random
|
|
|
|
+import signal
|
|
|
|
+import logging as log
|
|
|
|
+from urllib.parse import quote as urlquote
|
|
|
|
|
|
-from queue import Queue, Empty
|
|
|
|
|
|
+import requests
|
|
|
|
+from requests.adapters import HTTPAdapter, Retry
|
|
|
|
|
|
-log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
|
|
|
|
|
|
+from boxconfig import parse_config
|
|
|
|
+from dejavu import Dejavu
|
|
|
|
+from dejavu.recognize import FilePerSecondRecognizer
|
|
|
|
|
|
|
|
+# =========================
|
|
|
|
+# Configuración general
|
|
|
|
+# =========================
|
|
PATH = '/tmp'
|
|
PATH = '/tmp'
|
|
|
|
+API_NEXT = 'https://api.metrico.fourier.audio/comparacion/pendiente.json'
|
|
|
|
+API_POST = 'https://api.metrico.fourier.audio/comparacion/resultado.json'
|
|
|
|
+
|
|
|
|
+DEFAULT_TIMEOUT = 10
|
|
|
|
+SLEEP_MIN, SLEEP_MAX = 0.8, 2.5 # backoff base cuando no hay trabajo
|
|
|
|
+ERR_MIN, ERR_MAX = 2.0, 6.0 # backoff cuando hay error
|
|
|
|
+RUNNING = True
|
|
|
|
+
|
|
|
|
+log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
|
|
|
|
+
|
|
|
|
+
|
|
config = parse_config()
|
|
config = parse_config()
|
|
-queue = Queue()
|
|
|
|
recognizer = FilePerSecondRecognizer
|
|
recognizer = FilePerSecondRecognizer
|
|
|
|
|
|
|
|
+# Sesión HTTP robusta (reintentos + backoff)
|
|
|
|
+session = requests.Session()
|
|
|
|
+retries = Retry(total=3, backoff_factor=0.5, status_forcelist=(429, 500, 502, 503, 504))
|
|
|
|
+session.mount("https://", HTTPAdapter(max_retries=retries))
|
|
|
|
|
|
-def obt_siguiente_trabajo():
|
|
|
|
- url = 'https://api.metrico.fourier.audio/comparacion/pendiente.json'
|
|
|
|
- response = requests.get(url)
|
|
|
|
- return response.json()
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-def descargar_anuncio(ad_path):
|
|
|
|
- anuncio = os.path.basename(ad_path)
|
|
|
|
- path = os.path.join(PATH, 'ads')
|
|
|
|
- os.makedirs(path, exist_ok=True)
|
|
|
|
- ruta_anuncio = os.path.join(path, anuncio)
|
|
|
|
-
|
|
|
|
- if os.path.isfile(ruta_anuncio):
|
|
|
|
- return ruta_anuncio
|
|
|
|
- cloud_base_url = 'https://storage.googleapis.com/{}' \
|
|
|
|
- .format(config['bucket'])
|
|
|
|
- url = '{}/{}'.format(cloud_base_url, ad_path)
|
|
|
|
- response = requests.get(url)
|
|
|
|
-
|
|
|
|
- # TODO: Agregar alerta cuando la respuesta no sea 200
|
|
|
|
- if response.status_code == 200:
|
|
|
|
- with open(ruta_anuncio, "wb") as fp:
|
|
|
|
- fp.write(response.content)
|
|
|
|
- return ruta_anuncio
|
|
|
|
|
|
|
|
- else:
|
|
|
|
- log.info("[Anuncio][error] %s" % (response.text))
|
|
|
|
|
|
+# =========================
|
|
|
|
+# Utilidades HTTP
|
|
|
|
+# =========================
|
|
|
|
+def http_get(url, **kw):
|
|
|
|
+ return session.get(url, timeout=kw.pop("timeout", DEFAULT_TIMEOUT), **kw)
|
|
|
|
+
|
|
|
|
+def http_post(url, **kw):
|
|
|
|
+ return session.post(url, timeout=kw.pop("timeout", DEFAULT_TIMEOUT), **kw)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+# =========================
|
|
|
|
+# API servidor
|
|
|
|
+# =========================
|
|
|
|
+def obt_siguiente_trabajo():
|
|
|
|
+ r = http_get(API_NEXT)
|
|
|
|
+ if r.status_code == 204:
|
|
|
|
+ return None
|
|
|
|
+ r.raise_for_status()
|
|
|
|
+ data = r.json()
|
|
|
|
+ if not data or len(data.get("elementos", [])) == 0:
|
|
return None
|
|
return None
|
|
|
|
+ return data
|
|
|
|
+
|
|
|
|
+def enviar_resultados(payload):
|
|
|
|
+ r = http_post(API_POST, json=payload)
|
|
|
|
+ log.info("POST resultado %s %s", r.status_code, r.text[:300])
|
|
|
|
+ r.raise_for_status()
|
|
|
|
+ return r
|
|
|
|
+
|
|
|
|
+# =========================
|
|
|
|
+# Descargas
|
|
|
|
+# =========================
|
|
|
|
+def descargar_anuncio(ad_path, save_as=None):
|
|
|
|
+ """
|
|
|
|
+ Descarga de GCS. Guarda con save_as (tu campo 'anuncio' ya limpio) para llave 1:1.
|
|
|
|
+ """
|
|
|
|
+ filename = save_as or os.path.basename(ad_path)
|
|
|
|
+ out_dir = os.path.join(PATH, 'ads')
|
|
|
|
+ os.makedirs(out_dir, exist_ok=True)
|
|
|
|
+ out_file = os.path.join(out_dir, filename)
|
|
|
|
+
|
|
|
|
+ if os.path.isfile(out_file):
|
|
|
|
+ return out_file
|
|
|
|
+
|
|
|
|
+ cloud = f"https://storage.googleapis.com/{config['bucket']}"
|
|
|
|
+ url = f"{cloud}/{urlquote(ad_path, safe='/')}" # URL-encode del path
|
|
|
|
+ resp = http_get(url)
|
|
|
|
+ if resp.status_code == 200:
|
|
|
|
+ with open(out_file, "wb") as fp:
|
|
|
|
+ fp.write(resp.content)
|
|
|
|
+ return out_file
|
|
|
|
+
|
|
|
|
+ log.warning("[Anuncio][%s] %s", resp.status_code, resp.text)
|
|
|
|
+ return None
|
|
|
|
|
|
|
|
|
|
def descargar_media(box, station, media):
|
|
def descargar_media(box, station, media):
|
|
- ref = '{}/{}/{}'.format(box, station, media)
|
|
|
|
|
|
+ """
|
|
|
|
+ Descarga desde Firebase Storage (según tu patrón original).
|
|
|
|
+ """
|
|
|
|
+ ref = f"{box}/{station}/{media}"
|
|
file = os.path.basename(ref)
|
|
file = os.path.basename(ref)
|
|
- path = os.path.join(PATH, 'fourier', box, station)
|
|
|
|
- os.makedirs(path, exist_ok=True)
|
|
|
|
- out_file = os.path.join(path, file)
|
|
|
|
|
|
+ out_dir = os.path.join(PATH, 'fourier', box, station)
|
|
|
|
+ os.makedirs(out_dir, exist_ok=True)
|
|
|
|
+ out_file = os.path.join(out_dir, file)
|
|
|
|
|
|
if os.path.isfile(out_file):
|
|
if os.path.isfile(out_file):
|
|
return out_file
|
|
return out_file
|
|
|
|
|
|
- filename = ref.replace("/","%2F") \
|
|
|
|
- .replace("+","%2B")
|
|
|
|
- cloud_base_url = '%s%s' % (
|
|
|
|
- 'https://firebasestorage.googleapis.com',
|
|
|
|
- '/v0/b/fourier-6e14d.appspot.com/o'
|
|
|
|
- )
|
|
|
|
- url = '{}/{}?alt=media'.format(cloud_base_url, filename)
|
|
|
|
- response = requests.get(url)
|
|
|
|
-
|
|
|
|
- if response.status_code == 200:
|
|
|
|
|
|
+ filename = ref.replace("/", "%2F").replace("+", "%2B")
|
|
|
|
+ base = "https://firebasestorage.googleapis.com/v0/b/fourier-6e14d.appspot.com/o"
|
|
|
|
+ url = f"{base}/{filename}?alt=media"
|
|
|
|
+ resp = http_get(url)
|
|
|
|
+ if resp.status_code == 200:
|
|
with open(out_file, "wb") as fp:
|
|
with open(out_file, "wb") as fp:
|
|
- fp.write(response.content)
|
|
|
|
- return out_file
|
|
|
|
- else:
|
|
|
|
- log.info("[Media][url] %s" % (response.text))
|
|
|
|
- log.info("[Media][error] %s" % (response.text))
|
|
|
|
- return None
|
|
|
|
-
|
|
|
|
|
|
+ fp.write(resp.content)
|
|
|
|
+ return out_file
|
|
|
|
|
|
-def enviar_resultados(trabajo):
|
|
|
|
- print('[Pendiente] %s' % (json.dumps(trabajo),))
|
|
|
|
- url = 'https://api.metrico.fourier.audio/comparacion/resultado.json'
|
|
|
|
- response = requests.post(url, json=trabajo)
|
|
|
|
- print('[Response] %s' % (response.text))
|
|
|
|
- return response
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-def llenar_pila():
|
|
|
|
- """ Search for pending scheduled work in
|
|
|
|
- server and add them to a memory queue. """
|
|
|
|
- try:
|
|
|
|
- response = obt_siguiente_trabajo()
|
|
|
|
- if len(response["elementos"]) > 0:
|
|
|
|
- queue.put(response)
|
|
|
|
-
|
|
|
|
- if queue.qsize() > 0:
|
|
|
|
- loop.add_callback(procesar_siguiente_pila)
|
|
|
|
- else:
|
|
|
|
- loop.add_timeout(time.time() + 30, llenar_pila)
|
|
|
|
-
|
|
|
|
- except Exception as ex:
|
|
|
|
- """ Errores desconocidos """
|
|
|
|
- log.error('[feed_queue] {}'.format(ex))
|
|
|
|
- loop.add_timeout(time.time() + 60, llenar_pila)
|
|
|
|
- raise ex
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-def procesar_siguiente_pila():
|
|
|
|
- """ Try to the next item in a queue and start
|
|
|
|
- processing it accordingly. If success, repeat
|
|
|
|
- the function or go to feed if no more items. """
|
|
|
|
- try:
|
|
|
|
- item = queue.get(False)
|
|
|
|
- procesar_trabajo(item)
|
|
|
|
- loop.add_callback(procesar_siguiente_pila)
|
|
|
|
- except Empty:
|
|
|
|
- loop.add_callback(llenar_pila)
|
|
|
|
- except Exception as ex:
|
|
|
|
- log.error(ex)
|
|
|
|
- loop.add_callback(procesar_siguiente_pila)
|
|
|
|
|
|
+ log.warning("[Media][%s] %s", resp.status_code, resp.text)
|
|
|
|
+ return None
|
|
|
|
|
|
|
|
|
|
|
|
+# =========================
|
|
|
|
+# Core de procesamiento
|
|
|
|
+# =========================
|
|
def procesar_trabajo(pendiente):
|
|
def procesar_trabajo(pendiente):
|
|
- ciudad = pendiente['origen']
|
|
|
|
|
|
+ """
|
|
|
|
+ - Mapea por elem['anuncio'] (valor ya limpio sin 'anuncios/').
|
|
|
|
+ - Huella media con Dejavu (name = timestamp base como string).
|
|
|
|
+ - Reconoce cada anuncio y consolida hits en ventana de 30s.
|
|
|
|
+ - Reporta resultados.
|
|
|
|
+ """
|
|
|
|
+ ciudad = pendiente['origen']
|
|
estacion = pendiente['estacion']
|
|
estacion = pendiente['estacion']
|
|
- confianza = 35
|
|
|
|
- segmento = 5
|
|
|
|
- #if "segmento" in pendiente:
|
|
|
|
- # segmento = int(pendiente["segmento"])
|
|
|
|
-
|
|
|
|
- #if "confianza" in pendiente:
|
|
|
|
- # confianza = int(pendiente["confianza"])
|
|
|
|
-
|
|
|
|
- # Descarga de anuncios
|
|
|
|
- log.info("Descargando anuncios")
|
|
|
|
- try:
|
|
|
|
- anuncios = []
|
|
|
|
- id_by_ad = {}
|
|
|
|
- item_ids = []
|
|
|
|
- for i in pendiente["elementos"]:
|
|
|
|
- id_by_ad[i['anuncio']] = i['id']
|
|
|
|
- if i['id'] not in item_ids:
|
|
|
|
- item_ids.append(i['id'])
|
|
|
|
-
|
|
|
|
- anuncio = descargar_anuncio(i["ruta"])
|
|
|
|
- if anuncio is not None:
|
|
|
|
- anuncios.append(anuncio)
|
|
|
|
-
|
|
|
|
- except Exception as err:
|
|
|
|
- log.info('[process_segment] [{}] {}'.format(estacion, err))
|
|
|
|
-
|
|
|
|
- # Descarga de media
|
|
|
|
- log.info("Descargando media")
|
|
|
|
- try:
|
|
|
|
- media = []
|
|
|
|
- for i in pendiente["media"]:
|
|
|
|
- archivo = descargar_media(ciudad, estacion, i["ruta"])
|
|
|
|
- if archivo is not None:
|
|
|
|
- media.append((archivo, i["fecha"], i["timestamp"]))
|
|
|
|
-
|
|
|
|
- except Exception as err:
|
|
|
|
- log.info(err)
|
|
|
|
-
|
|
|
|
- log.info("Inicia la comparacion, tamaño de segmento %s" % (segmento,))
|
|
|
|
- try:
|
|
|
|
- dejavu = None
|
|
|
|
- resultados = {}
|
|
|
|
- aux = {}
|
|
|
|
- if len(media) > 0 and len(anuncio) > 0:
|
|
|
|
- dejavu = Dejavu({"database_type": "mem"})
|
|
|
|
|
|
+
|
|
|
|
+ # --- Descarga anuncios
|
|
|
|
+ anuncios_paths = []
|
|
|
|
+ id_by_ad = {}
|
|
|
|
+ for elem in pendiente.get("elementos", []):
|
|
|
|
+ ad_key = elem["anuncio"] # limpio desde PostgreSQL (sin prefijo 'anuncios/')
|
|
|
|
+ id_by_ad[ad_key] = elem["id"]
|
|
|
|
+
|
|
|
|
+ ad_file = descargar_anuncio(elem["ruta"], save_as=ad_key)
|
|
|
|
+ if ad_file:
|
|
|
|
+ anuncios_paths.append(ad_file)
|
|
|
|
+
|
|
|
|
+ # --- Descarga media
|
|
|
|
+ media = []
|
|
|
|
+ for m in pendiente.get("media", []):
|
|
|
|
+ f = descargar_media(ciudad, estacion, m["ruta"])
|
|
|
|
+ if f:
|
|
|
|
+ try:
|
|
|
|
+ ts = int(m["timestamp"])
|
|
|
|
+ except Exception:
|
|
|
|
+ ts = 0
|
|
|
|
+ media.append((f, m["fecha"], ts))
|
|
|
|
+
|
|
|
|
+ log.info("Comparación: media=%d anuncios=%d", len(media), len(anuncios_paths))
|
|
|
|
+
|
|
|
|
+ djv = None
|
|
|
|
+ resultados = {}
|
|
|
|
+ aux = {}
|
|
|
|
+
|
|
|
|
+ if len(media) > 0 and len(anuncios_paths) > 0:
|
|
|
|
+ djv = Dejavu({"database_type": "mem"})
|
|
|
|
+
|
|
|
|
+ # Fingerprint de cada archivo de media; usamos el timestamp como 'name'
|
|
|
|
+ for ruta, fecha, ts in media:
|
|
try:
|
|
try:
|
|
- x = 0
|
|
|
|
- for ruta, fecha, ts in media:
|
|
|
|
- log.info("Huellando %s" % (ruta,))
|
|
|
|
- dejavu.fingerprint_file(ruta, ts)
|
|
|
|
|
|
+ djv.fingerprint_file(ruta, str(ts))
|
|
except Exception as ex:
|
|
except Exception as ex:
|
|
- log.info(ex)
|
|
|
|
-
|
|
|
|
- for anuncio in anuncios:
|
|
|
|
- for i in dejavu.recognize(recognizer, anuncio, 5):
|
|
|
|
- if not "id" in i:
|
|
|
|
- continue
|
|
|
|
-
|
|
|
|
- nombre_anuncio = os.path.split(anuncio)[-1]
|
|
|
|
- id = id_by_ad[nombre_anuncio]
|
|
|
|
- if id not in resultados:
|
|
|
|
- resultados[id] = []
|
|
|
|
-
|
|
|
|
- obj = i
|
|
|
|
- obj["match_time"] = None
|
|
|
|
- dict = {
|
|
|
|
- "id": id,
|
|
|
|
- "anuncio": anuncio,
|
|
|
|
- "fecha": obj["name"],
|
|
|
|
- "timestamp": obj["name"] + int(obj['offset_seconds']),
|
|
|
|
- "confianza": obj["confidence"],
|
|
|
|
- "longitud": obj["length"],
|
|
|
|
- "desfase_segundos": obj["offset_seconds"]
|
|
|
|
- }
|
|
|
|
- resultados[id].append(dict)
|
|
|
|
-
|
|
|
|
- for k in resultados.keys():
|
|
|
|
- lista = sorted(resultados[k], key=lambda d: d['timestamp'])
|
|
|
|
- lista_nueva = []
|
|
|
|
- ult = None
|
|
|
|
- for x in range(0, len(lista)):
|
|
|
|
- if x == 0:
|
|
|
|
- ult = x
|
|
|
|
- lista_nueva.append(lista[ult])
|
|
|
|
|
|
+ log.warning("Fingerprint error %s: %r", ruta, ex)
|
|
|
|
+
|
|
|
|
+ # Reconocimiento por anuncio
|
|
|
|
+ for ad_path in anuncios_paths:
|
|
|
|
+ ad_key = os.path.basename(ad_path) # = elem['anuncio']
|
|
|
|
+ hits = djv.recognize(FilePerSecondRecognizer, ad_path, 5) or []
|
|
|
|
+ for hit in hits:
|
|
|
|
+ if "id" not in hit:
|
|
|
|
+ continue
|
|
|
|
+ try:
|
|
|
|
+ base_ts = int(hit["name"])
|
|
|
|
+ except Exception:
|
|
|
|
+ base_ts = 0
|
|
|
|
+ ts_match = base_ts + int(hit.get("offset_seconds", 0))
|
|
|
|
+
|
|
|
|
+ ad_id = id_by_ad.get(ad_key)
|
|
|
|
+ if ad_id is None:
|
|
|
|
+ log.warning("Sin id para anuncio %s", ad_key)
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+ entry = {
|
|
|
|
+ "id": ad_id,
|
|
|
|
+ "anuncio": ad_key,
|
|
|
|
+ "fecha": hit["name"], # ts base como string
|
|
|
|
+ "timestamp": ts_match, # entero
|
|
|
|
+ "confianza": int(hit.get("confidence", 0)),
|
|
|
|
+ "longitud": int(hit.get("length", 0)), # ms según lib
|
|
|
|
+ "desfase_segundos": int(hit.get("offset_seconds", 0)),
|
|
|
|
+ }
|
|
|
|
+ resultados.setdefault(ad_id, []).append(entry)
|
|
|
|
+
|
|
|
|
+ # Consolidación por ventana de 30s (suma confianza/longitud)
|
|
|
|
+ for k, lista in resultados.items():
|
|
|
|
+ lista.sort(key=lambda d: d['timestamp'])
|
|
|
|
+ merged = []
|
|
|
|
+ for i, it in enumerate(lista):
|
|
|
|
+ if i == 0:
|
|
|
|
+ merged.append(it)
|
|
|
|
+ else:
|
|
|
|
+ if it['timestamp'] - lista[i-1]['timestamp'] <= 30:
|
|
|
|
+ merged[-1]['confianza'] += it['confianza']
|
|
|
|
+ merged[-1]['longitud'] += it['longitud']
|
|
else:
|
|
else:
|
|
- dif = lista[x]['timestamp'] - lista[x - 1]['timestamp']
|
|
|
|
- if dif <= 30:
|
|
|
|
- lista_nueva[ult]['confianza'] = int(lista_nueva[ult]['confianza']) + int(lista[x]['confianza'])
|
|
|
|
- lista_nueva[ult]['longitud'] = int(lista_nueva[ult]['longitud']) +int(lista[x]['longitud'])
|
|
|
|
- else:
|
|
|
|
- lista_nueva.append(lista[x])
|
|
|
|
- ult = len(lista_nueva) - 1
|
|
|
|
|
|
+ merged.append(it)
|
|
|
|
+ aux[k] = merged
|
|
|
|
|
|
- aux[k] = lista_nueva
|
|
|
|
|
|
+ else:
|
|
|
|
+ # TODO: Agregar más información del problema
|
|
|
|
+ for i in pendiente.get('elementos', []):
|
|
|
|
+ i['comentario'] = 'Problemas técnicos (media/anuncios vacíos)'
|
|
|
|
|
|
- else:
|
|
|
|
|
|
+ # Ensamblado de respuesta
|
|
|
|
+ for ad_id, lista in aux.items():
|
|
|
|
+ for e in lista:
|
|
for i in pendiente['elementos']:
|
|
for i in pendiente['elementos']:
|
|
- i['comentario'] = 'Problemas técnicos'
|
|
|
|
-
|
|
|
|
- for id in aux:
|
|
|
|
- for e in aux[id]:
|
|
|
|
- for i in pendiente['elementos']:
|
|
|
|
|
|
+ if i['id'] == e['id'] and i['anuncio'] == e['anuncio']:
|
|
|
|
+ i.setdefault('encontrados', []).append({
|
|
|
|
+ "fecha": e["fecha"],
|
|
|
|
+ "anuncio": e["anuncio"],
|
|
|
|
+ "longitud": int(e["longitud"] / 1000), # a segundos
|
|
|
|
+ "confianza": e["confianza"],
|
|
|
|
+ "timestamp": e["timestamp"],
|
|
|
|
+ "desfase_segundos": e["desfase_segundos"]
|
|
|
|
+ })
|
|
i['comentario'] = ''
|
|
i['comentario'] = ''
|
|
- anuncio = e['anuncio'].replace('/tmp/ads/', '')
|
|
|
|
- if i['id'] == e['id'] and i['anuncio'] == anuncio:
|
|
|
|
- if 'encontrados' not in i:
|
|
|
|
- i['encontrados'] = []
|
|
|
|
- obj = {
|
|
|
|
- "fecha": e["fecha"],
|
|
|
|
- "anuncio": anuncio,
|
|
|
|
- "longitud": int(e["longitud"] / 1000),
|
|
|
|
- "confianza": e["confianza"],
|
|
|
|
- "timestamp": e["timestamp"],
|
|
|
|
- "desfase_segundos": e["desfase_segundos"]
|
|
|
|
- }
|
|
|
|
- i['encontrados'].append(obj)
|
|
|
|
- break
|
|
|
|
-
|
|
|
|
- # log.info(json.dumps(extras))
|
|
|
|
- log.info("[Resultado] %s" % (json.dumps(resultados)))
|
|
|
|
- pendiente["media"] = None
|
|
|
|
- enviar_resultados(pendiente)
|
|
|
|
-
|
|
|
|
- except Exception as ex:
|
|
|
|
- log.info(ex)
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-app = setup_endpoint(queue=queue)
|
|
|
|
-loop = IOLoop.current()
|
|
|
|
-loop.add_callback(llenar_pila)
|
|
|
|
|
|
+ break
|
|
|
|
+
|
|
|
|
+ # No subimos media completa de vuelta
|
|
|
|
+ pendiente["media"] = len(pendiente.get("media", []))
|
|
|
|
+ enviar_resultados(pendiente)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def _stop(*_):
|
|
|
|
+ global RUNNING
|
|
|
|
+ RUNNING = False
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def main_loop():
|
|
|
|
+ worker_id = f"{os.uname().nodename}:{os.getpid()}" if hasattr(os, "uname") else f"pid:{os.getpid()}"
|
|
|
|
+ log.info("Worker pull iniciado (%s)", worker_id)
|
|
|
|
+
|
|
|
|
+ # Señales para detener con gracia
|
|
|
|
+ signal.signal(signal.SIGTERM, _stop)
|
|
|
|
+ signal.signal(signal.SIGINT, _stop)
|
|
|
|
+
|
|
|
|
+ while RUNNING:
|
|
|
|
+ try:
|
|
|
|
+ job = obt_siguiente_trabajo()
|
|
|
|
+ if not job:
|
|
|
|
+ time.sleep(random.uniform(SLEEP_MIN, SLEEP_MAX))
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+ procesar_trabajo(job)
|
|
|
|
+
|
|
|
|
+ except requests.exceptions.RequestException as net_ex:
|
|
|
|
+ log.warning("Red/API: %r", net_ex)
|
|
|
|
+ time.sleep(random.uniform(ERR_MIN, ERR_MAX))
|
|
|
|
+ except Exception as ex:
|
|
|
|
+ log.exception("Error procesando trabajo: %r", ex)
|
|
|
|
+ time.sleep(random.uniform(ERR_MIN, ERR_MAX))
|
|
|
|
+
|
|
|
|
+ log.info("Worker detenido con gracia")
|
|
|
|
+ return 0
|
|
|
|
+
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if __name__ == '__main__':
|
|
- try:
|
|
|
|
- log.info('Starting ondemand service')
|
|
|
|
- loop.start()
|
|
|
|
- except KeyboardInterrupt:
|
|
|
|
- log.error('Process killed')
|
|
|
|
|
|
+ sys.exit(main_loop())
|