ondemand.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. from __future__ import print_function, absolute_import
  2. import warnings
  3. warnings.simplefilter("ignore", UserWarning)
  4. from tornado.ioloop import IOLoop
  5. from boxconfig import parse_config
  6. from dejavu.recognize import FilePerSecondRecognizer
  7. from dejavu import Dejavu
  8. from endpoint import setup_endpoint
  9. import logging as log
  10. import requests
  11. import json
  12. import time
  13. import os
  14. from queue import Queue, Empty
  15. log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
  16. PATH = '/tmp'
  17. AHEAD_TIME_AUDIO_TOLERANCE = 2 # second
  18. MAX_SEGMENT_THREADS = 4
  19. THRESHOLD = 10
  20. SEGMENTS_TOLERANCE_RATE = 0.6
  21. FALL_TOLERANCE_SEGMENTS = 1
  22. THRESHOLD_FIXED = 1
  23. THRESHOLD_AVERAGE = 2
  24. QUEUE_SINGLE = 1
  25. config = parse_config()
  26. queue = Queue()
  27. recognizer = FilePerSecondRecognizer
  28. threshold_mode = THRESHOLD_FIXED
  29. def obt_siguiente_trabajo():
  30. url = 'https://api.fourier.audio/na/calendario/pendiente?id=%s' % (config['device_id'],)
  31. response = requests.get(url)
  32. return response.json()
  33. def descargar_anuncio(ad_path):
  34. anuncio = os.path.basename(ad_path)
  35. path = os.path.join(PATH, 'ads')
  36. os.makedirs(path, exist_ok=True)
  37. ruta_anuncio = os.path.join(path, anuncio)
  38. if os.path.isfile(ruta_anuncio):
  39. return ruta_anuncio
  40. cloud_base_url = 'https://storage.googleapis.com/{}' \
  41. .format(config['bucket'])
  42. url = '{}/{}'.format(cloud_base_url, ad_path)
  43. response = requests.get(url)
  44. # TODO: Agregar alerta cuando la respuesta no sea 200
  45. if response.status_code == 200:
  46. with open(ruta_anuncio, "wb") as fp:
  47. fp.write(response.content)
  48. return ruta_anuncio
  49. else:
  50. log.info("Error al descargar")
  51. log.info(response)
  52. return None
  53. def descargar_media(box, station, media):
  54. ref = '{}/{}/{}'.format(box, station, media)
  55. file = os.path.basename(ref)
  56. path = os.path.join(PATH, 'fourier', box, station)
  57. os.makedirs(path, exist_ok=True)
  58. out_file = os.path.join(path, file)
  59. if os.path.isfile(out_file):
  60. return out_file
  61. filename = ref.replace("/","%2F") \
  62. .replace("+","%2B")
  63. cloud_base_url = '%s%s' % (
  64. 'https://firebasestorage.googleapis.com',
  65. '/v0/b/fourier-6e14d.appspot.com/o/'
  66. )
  67. url = '{}/{}?alt=media'.format(cloud_base_url, filename)
  68. response = requests.get(url)
  69. if response.status_code == 200:
  70. with open(out_file, "wb") as fp:
  71. fp.write(response.content)
  72. return out_file
  73. else:
  74. log.info("Error al descargar")
  75. log.info(response)
  76. return None
  77. def obt_calibracion(calibracion):
  78. default = {
  79. 'threshold': 12,
  80. 'tolerance': 0.8,
  81. 'fallTolerance': 1,
  82. 'segmentSize': 5,
  83. }
  84. if 'threshold' in calibracion:
  85. default['threshold'] = calibracion['threshold']
  86. if 'tolerance' in calibracion:
  87. default['tolerance'] = calibracion['tolerance']
  88. if 'segmentSize' in calibracion:
  89. default['segmentSize'] = calibracion['segmentSize']
  90. if 'fallTolerance' in calibracion:
  91. default['fallTolerance'] = calibracion['fallTolerance']
  92. return default
  93. def enviar_resultados(trabajo):
  94. print('[resultados] %s' % (json.dumps(trabajo),))
  95. url = 'https://api.fourier.audio/na/calendario/resultado'
  96. response = requests.post(url, json=trabajo)
  97. return response
  98. def llenar_pila():
  99. """ Search for pending scheduled work in
  100. server and add them to a memory queue. """
  101. try:
  102. response = obt_siguiente_trabajo()
  103. if len(response["elementos"]) > 0:
  104. queue.put(response)
  105. if queue.qsize() > 0:
  106. loop.add_callback(procesar_siguiente_pila)
  107. else:
  108. loop.add_timeout(time.time() + 30, llenar_pila)
  109. except Exception as ex:
  110. """ Errores desconocidos """
  111. log.error('[feed_queue] {}'.format(ex))
  112. loop.add_timeout(time.time() + 60, llenar_pila)
  113. raise ex
  114. def procesar_siguiente_pila():
  115. """ Try to the next item in a queue and start
  116. processing it accordingly. If success, repeat
  117. the function or go to feed if no more items. """
  118. try:
  119. item = queue.get(False)
  120. procesar_trabajo(item)
  121. loop.add_callback(procesar_siguiente_pila)
  122. except Empty:
  123. loop.add_callback(llenar_pila)
  124. except Exception as ex:
  125. log.error(ex)
  126. loop.add_callback(procesar_siguiente_pila)
  127. def procesar_trabajo(pendiente):
  128. ciudad = pendiente['origen']
  129. estacion = pendiente['estacion']
  130. # Descarga de anuncios
  131. try:
  132. anuncios = []
  133. id_by_ad = {}
  134. item_ids = []
  135. for i in pendiente["elementos"]:
  136. id_by_ad[i['anuncio']] = i['id']
  137. if i['id'] not in item_ids:
  138. item_ids.append(i['id'])
  139. anuncio = descargar_anuncio(i["ruta"])
  140. if anuncio is not None:
  141. anuncios.append(anuncio)
  142. except Exception as err:
  143. log.info('[process_segment] [{}] {}'.format(estacion, err))
  144. # Descarga de media
  145. try:
  146. media = []
  147. for i in pendiente["media"]:
  148. archivo = descargar_media(ciudad, estacion, i["ruta"])
  149. if archivo is not None:
  150. media.append((archivo, i["fecha"], i["timestamp"]))
  151. except Exception as err:
  152. log.info(err)
  153. dejavu = None
  154. resultados = {}
  155. try:
  156. dejavu = Dejavu({"database_type": "mem"})
  157. try:
  158. x = 0
  159. for ruta, fecha, ts in media:
  160. log.info("Huellando %s" % (ruta,))
  161. dejavu.fingerprint_file(ruta, ts)
  162. except Exception as ex:
  163. log.info(ex)
  164. for anuncio in anuncios:
  165. log.info("Buscando anuncio %s" % (anuncio,))
  166. for i in dejavu.recognize(recognizer, anuncio, 5):
  167. if not "id" in i:
  168. continue
  169. if i["confidence"] < 50:
  170. continue
  171. obj = i
  172. obj["match_time"] = None
  173. nombre_anuncio = os.path.split(anuncio)[-1]
  174. id = id_by_ad[nombre_anuncio]
  175. dict = {
  176. "id": id,
  177. "anuncio": anuncio,
  178. "fecha": obj["name"],
  179. "timestamp": obj["name"] + int(obj['offset_seconds']),
  180. "confianza": obj["confidence"],
  181. "longitud": obj["length"],
  182. "desfase_segundos": obj["offset_seconds"]
  183. }
  184. if i["id"] in resultados.keys():
  185. resultados[i["id"]]["longitud"] = resultados[i["id"]]["longitud"] + dict["longitud"]
  186. resultados[i["id"]]["confianza"] = resultados[i["id"]]["confianza"] + dict["confianza"]
  187. continue
  188. resultados[i["id"]] = dict
  189. for id in resultados:
  190. e = resultados[id]
  191. for i in pendiente['elementos']:
  192. anuncio = e['anuncio'].replace('/tmp/ads/', '')
  193. if i['id'] == id and i['anuncio'] == anuncio:
  194. if 'encontrados' not in i:
  195. i['encontrados'] = []
  196. i['encontrados'].append(e)
  197. break
  198. enviar_resultados(pendiente)
  199. except Exception as ex:
  200. log.info(ex)
  201. app = setup_endpoint(queue=queue)
  202. loop = IOLoop.current()
  203. loop.add_callback(llenar_pila)
  204. if __name__ == '__main__':
  205. try:
  206. log.info('Starting ondemand service')
  207. loop.start()
  208. except KeyboardInterrupt:
  209. log.error('Process killed')