ondemand.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. from __future__ import print_function, absolute_import
  2. import warnings
  3. warnings.simplefilter("ignore", UserWarning)
  4. from tornado.ioloop import IOLoop
  5. from boxconfig import parse_config
  6. from dejavu.recognize import FilePerSecondRecognizer
  7. from dejavu import Dejavu
  8. from endpoint import setup_endpoint
  9. import logging as log
  10. import requests
  11. import json
  12. import time
  13. import os
  14. from queue import Queue, Empty
  15. log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
  16. PATH = '/tmp'
  17. config = parse_config()
  18. queue = Queue()
  19. recognizer = FilePerSecondRecognizer
  20. def obt_siguiente_trabajo():
  21. log.info("Consultando trabajo")
  22. url = 'https://api.fourier.audio/na/calendario/pendiente?id=%s' % (config['device_id'],)
  23. response = requests.get(url)
  24. return response.json()
  25. def descargar_anuncio(ad_path):
  26. anuncio = os.path.basename(ad_path)
  27. path = os.path.join(PATH, 'ads')
  28. os.makedirs(path, exist_ok=True)
  29. ruta_anuncio = os.path.join(path, anuncio)
  30. if os.path.isfile(ruta_anuncio):
  31. return ruta_anuncio
  32. cloud_base_url = 'https://storage.googleapis.com/{}' \
  33. .format(config['bucket'])
  34. url = '{}/{}'.format(cloud_base_url, ad_path)
  35. response = requests.get(url)
  36. # TODO: Agregar alerta cuando la respuesta no sea 200
  37. if response.status_code == 200:
  38. with open(ruta_anuncio, "wb") as fp:
  39. fp.write(response.content)
  40. return ruta_anuncio
  41. else:
  42. log.info("[Anuncio][error] %s" % (response.text))
  43. return None
  44. def descargar_media(box, station, media):
  45. ref = '{}/{}/{}'.format(box, station, media)
  46. file = os.path.basename(ref)
  47. path = os.path.join(PATH, 'fourier', box, station)
  48. os.makedirs(path, exist_ok=True)
  49. out_file = os.path.join(path, file)
  50. if os.path.isfile(out_file):
  51. return out_file
  52. filename = ref.replace("/","%2F") \
  53. .replace("+","%2B")
  54. cloud_base_url = '%s%s' % (
  55. 'https://firebasestorage.googleapis.com',
  56. '/v0/b/fourier-6e14d.appspot.com/o'
  57. )
  58. url = '{}/{}?alt=media'.format(cloud_base_url, filename)
  59. response = requests.get(url)
  60. if response.status_code == 200:
  61. with open(out_file, "wb") as fp:
  62. fp.write(response.content)
  63. return out_file
  64. else:
  65. log.info("[Media][url] %s" % (response.text))
  66. log.info("[Media][error] %s" % (response.text))
  67. return None
  68. def enviar_resultados(trabajo):
  69. log.info('[Pendiente] %s' % (json.dumps(trabajo),))
  70. url = 'https://api.fourier.audio/na/calendario/resultado'
  71. response = requests.post(url, json=trabajo)
  72. log.info('[Response] %s' % (response.text))
  73. return response
  74. def llenar_pila():
  75. """ Search for pending scheduled work in
  76. server and add them to a memory queue. """
  77. try:
  78. response = obt_siguiente_trabajo()
  79. if len(response["elementos"]) > 0:
  80. queue.put(response)
  81. if queue.qsize() > 0:
  82. loop.add_callback(procesar_siguiente_pila)
  83. else:
  84. loop.add_timeout(time.time() + 30, llenar_pila)
  85. except Exception as ex:
  86. """ Errores desconocidos """
  87. log.error('[feed_queue] {}'.format(ex))
  88. loop.add_timeout(time.time() + 60, llenar_pila)
  89. raise ex
  90. def procesar_siguiente_pila():
  91. """ Try to the next item in a queue and start
  92. processing it accordingly. If success, repeat
  93. the function or go to feed if no more items. """
  94. try:
  95. item = queue.get(False)
  96. procesar_trabajo(item)
  97. loop.add_callback(procesar_siguiente_pila)
  98. except Empty:
  99. loop.add_callback(llenar_pila)
  100. except Exception as ex:
  101. log.error(ex)
  102. loop.add_callback(procesar_siguiente_pila)
  103. def procesar_trabajo(pendiente):
  104. ciudad = pendiente['origen']
  105. estacion = pendiente['estacion']
  106. confianza = 35
  107. segmento = 5
  108. if "segmento" in pendiente:
  109. segmento = int(pendiente["segmento"])
  110. if "confianza" in pendiente:
  111. confianza = int(pendiente["confianza"])
  112. # Descarga de anuncios
  113. log.info("Descargando anuncios")
  114. try:
  115. anuncios = []
  116. id_by_ad = {}
  117. item_ids = []
  118. for i in pendiente["elementos"]:
  119. id_by_ad[i['anuncio']] = i['id']
  120. if i['id'] not in item_ids:
  121. item_ids.append(i['id'])
  122. anuncio = descargar_anuncio(i["ruta"])
  123. if anuncio is not None:
  124. anuncios.append(anuncio)
  125. except Exception as err:
  126. log.info('[process_segment] [{}] {}'.format(estacion, err))
  127. # Descarga de media
  128. log.info("Descargando media")
  129. try:
  130. media = []
  131. for i in pendiente["media"]:
  132. archivo = descargar_media(ciudad, estacion, i["ruta"])
  133. if archivo is not None:
  134. media.append((archivo, i["fecha"], i["timestamp"]))
  135. except Exception as err:
  136. log.info(err)
  137. log.info("Inicia la comparacion, tamaño de segmento %s" % (segmento,))
  138. try:
  139. dejavu = None
  140. resultados = {}
  141. if len(media) > 0 and len(anuncio) > 0:
  142. dejavu = Dejavu({"database_type": "mem"})
  143. try:
  144. x = 0
  145. for ruta, fecha, ts in media:
  146. log.info("Huellando %s" % (ruta,))
  147. dejavu.fingerprint_file(ruta, ts)
  148. except Exception as ex:
  149. log.info(ex)
  150. for anuncio in anuncios:
  151. for i in dejavu.recognize(recognizer, anuncio, 5):
  152. if not "id" in i:
  153. continue
  154. nombre_anuncio = os.path.split(anuncio)[-1]
  155. id = id_by_ad[nombre_anuncio]
  156. if id not in resultados:
  157. resultados[id] = []
  158. obj = i
  159. obj["match_time"] = None
  160. dict = {
  161. "id": id,
  162. "anuncio": anuncio,
  163. "fecha": obj["name"],
  164. "timestamp": obj["name"] + int(obj['offset_seconds']),
  165. "confianza": obj["confidence"],
  166. "longitud": obj["length"],
  167. "desfase_segundos": obj["offset_seconds"]
  168. }
  169. resultados[id].append(dict)
  170. aux = {}
  171. for k in resultados.keys():
  172. lista = sorted(resultados[k], key=lambda d: d['timestamp'])
  173. lista_nueva = []
  174. ult = None
  175. for x in range(0, len(lista)):
  176. if x == 0:
  177. ult = x
  178. lista_nueva.append(lista[ult])
  179. else:
  180. dif = lista[x]['timestamp'] - lista[x - 1]['timestamp']
  181. if dif <= 30:
  182. lista_nueva[ult]['confianza'] = int(lista_nueva[ult]['confianza']) + int(lista[x]['confianza'])
  183. lista_nueva[ult]['longitud'] = int(lista_nueva[ult]['longitud']) +int(lista[x]['longitud'])
  184. else:
  185. lista_nueva.append(lista[x])
  186. ult = len(lista_nueva) - 1
  187. aux[k] = lista_nueva
  188. else:
  189. for i in pendiente['elementos']:
  190. i['comentario'] = 'Problemas técnicos'
  191. for id in aux:
  192. for e in aux[id]:
  193. for i in pendiente['elementos']:
  194. i['comentario'] = ''
  195. anuncio = e['anuncio'].replace('/tmp/ads/', '')
  196. if i['id'] == e['id'] and i['anuncio'] == anuncio:
  197. if 'encontrados' not in i:
  198. i['encontrados'] = []
  199. obj = {
  200. "fecha": e["fecha"],
  201. "anuncio": anuncio,
  202. "longitud": int(e["longitud"] / 1000),
  203. "confianza": e["confianza"],
  204. "timestamp": e["timestamp"],
  205. "desfase_segundos": e["desfase_segundos"]
  206. }
  207. i['encontrados'].append(obj)
  208. break
  209. # log.info(json.dumps(extras))
  210. log.info("[Resultado] %s" % (json.dumps(resultados)))
  211. pendiente["media"] = None
  212. enviar_resultados(pendiente)
  213. except Exception as ex:
  214. log.info(ex)
  215. app = setup_endpoint(queue=queue)
  216. loop = IOLoop.current()
  217. loop.add_callback(llenar_pila)
  218. if __name__ == '__main__':
  219. try:
  220. log.info('Starting ondemand service')
  221. loop.start()
  222. except KeyboardInterrupt:
  223. log.error('Process killed')