ondemand.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. from __future__ import print_function, absolute_import
  2. import warnings
  3. warnings.simplefilter("ignore", UserWarning)
  4. from tornado.ioloop import IOLoop
  5. from boxconfig import parse_config
  6. from dejavu.recognize import FilePerSecondRecognizer
  7. from dejavu import Dejavu
  8. from endpoint import setup_endpoint
  9. import logging as log
  10. import requests
  11. import json
  12. import time
  13. import os
  14. from queue import Queue, Empty
  15. log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
  16. PATH = '/tmp'
  17. config = parse_config()
  18. queue = Queue()
  19. recognizer = FilePerSecondRecognizer
  20. def obt_siguiente_trabajo():
  21. url = 'https://api.fourier.audio/na/calendario/pendiente?id=%s' % (config['device_id'],)
  22. response = requests.get(url)
  23. return response.json()
  24. def descargar_anuncio(ad_path):
  25. anuncio = os.path.basename(ad_path)
  26. path = os.path.join(PATH, 'ads')
  27. os.makedirs(path, exist_ok=True)
  28. ruta_anuncio = os.path.join(path, anuncio)
  29. if os.path.isfile(ruta_anuncio):
  30. return ruta_anuncio
  31. cloud_base_url = 'https://storage.googleapis.com/{}' \
  32. .format(config['bucket'])
  33. url = '{}/{}'.format(cloud_base_url, ad_path)
  34. response = requests.get(url)
  35. # TODO: Agregar alerta cuando la respuesta no sea 200
  36. if response.status_code == 200:
  37. with open(ruta_anuncio, "wb") as fp:
  38. fp.write(response.content)
  39. return ruta_anuncio
  40. else:
  41. log.info("[Anuncio][error] %s" % (response.text))
  42. return None
  43. def descargar_media(box, station, media):
  44. ref = '{}/{}/{}'.format(box, station, media)
  45. file = os.path.basename(ref)
  46. path = os.path.join(PATH, 'fourier', box, station)
  47. os.makedirs(path, exist_ok=True)
  48. out_file = os.path.join(path, file)
  49. if os.path.isfile(out_file):
  50. return out_file
  51. filename = ref.replace("/","%2F") \
  52. .replace("+","%2B")
  53. cloud_base_url = '%s%s' % (
  54. 'https://firebasestorage.googleapis.com',
  55. '/v0/b/fourier-6e14d.appspot.com/o'
  56. )
  57. url = '{}/{}?alt=media'.format(cloud_base_url, filename)
  58. response = requests.get(url)
  59. if response.status_code == 200:
  60. with open(out_file, "wb") as fp:
  61. fp.write(response.content)
  62. return out_file
  63. else:
  64. log.info("[Media][url] %s" % (response.text))
  65. log.info("[Media][error] %s" % (response.text))
  66. return None
  67. def enviar_resultados(trabajo):
  68. log.info('[Pendiente] %s' % (json.dumps(trabajo),))
  69. url = 'https://api.fourier.audio/v1/calendario/resultado'
  70. # response = requests.post(url, json=trabajo)
  71. # log.info('[Response] %s' % (response.text))
  72. # return response
  73. def llenar_pila():
  74. """ Search for pending scheduled work in
  75. server and add them to a memory queue. """
  76. try:
  77. response = obt_siguiente_trabajo()
  78. if len(response["elementos"]) > 0:
  79. queue.put(response)
  80. if queue.qsize() > 0:
  81. loop.add_callback(procesar_siguiente_pila)
  82. else:
  83. loop.add_timeout(time.time() + 30, llenar_pila)
  84. except Exception as ex:
  85. """ Errores desconocidos """
  86. log.error('[feed_queue] {}'.format(ex))
  87. loop.add_timeout(time.time() + 60, llenar_pila)
  88. raise ex
  89. def procesar_siguiente_pila():
  90. """ Try to the next item in a queue and start
  91. processing it accordingly. If success, repeat
  92. the function or go to feed if no more items. """
  93. try:
  94. item = queue.get(False)
  95. procesar_trabajo(item)
  96. loop.add_callback(procesar_siguiente_pila)
  97. except Empty:
  98. loop.add_callback(llenar_pila)
  99. except Exception as ex:
  100. log.error(ex)
  101. loop.add_callback(procesar_siguiente_pila)
  102. def procesar_trabajo(pendiente):
  103. ciudad = pendiente['origen']
  104. estacion = pendiente['estacion']
  105. # Descarga de anuncios
  106. log.info("Descargando anuncios")
  107. try:
  108. anuncios = []
  109. id_by_ad = {}
  110. item_ids = []
  111. for i in pendiente["elementos"]:
  112. id_by_ad[i['anuncio']] = i['id']
  113. if i['id'] not in item_ids:
  114. item_ids.append(i['id'])
  115. anuncio = descargar_anuncio(i["ruta"])
  116. if anuncio is not None:
  117. log.info("Listo %s" % (i['ruta'],))
  118. anuncios.append(anuncio)
  119. except Exception as err:
  120. log.info('[process_segment] [{}] {}'.format(estacion, err))
  121. # Descarga de media
  122. log.info("Descargando media")
  123. try:
  124. media = []
  125. for i in pendiente["media"]:
  126. archivo = descargar_media(ciudad, estacion, i["ruta"])
  127. if archivo is not None:
  128. log.info("Listo %s %s %s" % (ciudad, estacion, i['ruta'],))
  129. media.append((archivo, i["fecha"], i["timestamp"]))
  130. except Exception as err:
  131. log.info(err)
  132. if len(media) == 0 or len(anuncio) == 0:
  133. log.info("No hay media o anuncios para comparar")
  134. return
  135. dejavu = None
  136. resultados = {}
  137. try:
  138. dejavu = Dejavu({"database_type": "mem"})
  139. try:
  140. x = 0
  141. for ruta, fecha, ts in media:
  142. log.info("Huellando %s" % (ruta,))
  143. dejavu.fingerprint_file(ruta, ts)
  144. except Exception as ex:
  145. log.info(ex)
  146. for anuncio in anuncios:
  147. log.info("Buscando anuncio %s" % (anuncio,))
  148. for i in dejavu.recognize(recognizer, anuncio, 10):
  149. if not "id" in i:
  150. continue
  151. if i["confidence"] < 35:
  152. continue
  153. obj = i
  154. obj["match_time"] = None
  155. nombre_anuncio = os.path.split(anuncio)[-1]
  156. id = id_by_ad[nombre_anuncio]
  157. dict = {
  158. "id": id,
  159. "anuncio": anuncio,
  160. "fecha": obj["name"],
  161. "timestamp": obj["name"] + int(obj['offset_seconds']),
  162. "confianza": obj["confidence"],
  163. "longitud": obj["length"],
  164. "desfase_segundos": obj["offset_seconds"]
  165. }
  166. if id in resultados.keys():
  167. resultados[id]["longitud"] += dict["longitud"]
  168. resultados[id]["confianza"] += dict["confianza"]
  169. continue
  170. resultados[id] = dict
  171. for id in resultados:
  172. e = resultados[id]
  173. for i in pendiente['elementos']:
  174. anuncio = e['anuncio'].replace('/tmp/ads/', '')
  175. if i['id'] == e['id'] and i['anuncio'] == anuncio:
  176. if 'encontrados' not in i:
  177. i['encontrados'] = []
  178. i['encontrados'].append(e)
  179. break
  180. log.info("[Resultado] %s" % (json.dumps(resultados)))
  181. enviar_resultados(pendiente)
  182. except Exception as ex:
  183. log.info(ex)
  184. app = setup_endpoint(queue=queue)
  185. loop = IOLoop.current()
  186. loop.add_callback(llenar_pila)
  187. if __name__ == '__main__':
  188. try:
  189. log.info('Starting ondemand service')
  190. loop.start()
  191. except KeyboardInterrupt:
  192. log.error('Process killed')