service1.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. # -*- coding: utf8 -*-
  2. from __future__ import print_function, absolute_import
  3. from tornado.ioloop import IOLoop
  4. from tornado.web import Application
  5. from client import Client, ConnectionError
  6. from boxconfig import parse_config
  7. from dejavu.recognize import FilePerSecondRecognizer
  8. from datetime import datetime, timedelta
  9. from endpoint import setup_endpoint
  10. from calibration import Calibrations
  11. from dejavu import Dejavu, CouldntDecodeError
  12. from firebase_admin import credentials
  13. from firebase_admin import db as fbdb
  14. from multiprocessing import Process
  15. from subprocess import Popen, PIPE
  16. import logging as log
  17. import firebase_admin
  18. import mutagen.mp3
  19. import math
  20. import sys
  21. import os
  22. if sys.version_info >= (3, 0):
  23. from queue import Queue, Empty
  24. else:
  25. from Queue import Queue, Empty
  26. log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
  27. AUDIOS_PATH = '/tmp'
  28. AHEAD_TIME_AUDIO_TOLERANCE = 2 # second
  29. MAX_SEGMENT_THREADS = 4
  30. THRESHOLD = 10
  31. SEGMENTS_TOLERANCE_RATE = 0.6
  32. FALL_TOLERANCE_SEGMENTS = 1
  33. # THRESHOLD
  34. THRESHOLD_FIXED = 1
  35. THRESHOLD_AVERAGE = 2
  36. # Modos de procesamiento de queue
  37. # - QUEQUE_SINGLE: procesa solo un segmento a la vez
  38. # - QUEUE_THREAD: inicia un hilo para cada segmento
  39. # Por default se usará el threaded.
  40. # TODO: hacerlo configurable por medio de argumentos
  41. # de ejecución.
  42. QUEUE_SINGLE = 1
  43. QUEUE_THREAD = 2
  44. # Se pueden usar diferentes API'se
  45. # la de threading y la de multiprocessing.
  46. MultiAPI = Process
  47. config = parse_config()
  48. queue = Queue()
  49. client = Client(config['device_id'],
  50. config['apiSecret'])
  51. cloud_base_url = 'https://storage.googleapis.com/{}' \
  52. .format(config['bucket'])
  53. base_path = config.get("basepath", "/var/fourier")
  54. fb_credentials = credentials.Certificate('/code/Fourier-key.json')
  55. firebase_admin.initialize_app(fb_credentials, config['firebase'])
  56. device_id = config['device_id']
  57. device_path = os.path.join(base_path, device_id)
  58. recognizer = FilePerSecondRecognizer
  59. device_ref = fbdb.reference('devices').child(config['device_id'])
  60. calibrations = Calibrations(config['device_id'], client=client)
  61. # settings
  62. queue_mode = QUEUE_SINGLE
  63. threshold_mode = THRESHOLD_FIXED
  64. db_path = config.get('localDatabase', os.path.join(device_path, 'files.db'))
  65. #db = sqlite3.connect(db_path)
  66. cloud_cache = {}
  67. def process_segment(audios=None, calibration=None):
  68. """ Procesa una hora de audio """
  69. #date = dateutil.parser.parse(item['fecha'], ignoretz=True)
  70. segment_size = 5
  71. audio_length = 0
  72. # 1.1 Calcular el número de segmentos requeridos
  73. # de acuerdo a la duración total del audio.
  74. try:
  75. filename = "/tmp/anuncios/-MOiAvmUkZLmXrAWCy9u.mp3"
  76. audio = mutagen.mp3.MP3(filename)
  77. audio_length = audio.info.length
  78. if segment_size == 'integer':
  79. segment_size = int(audio_length)
  80. elif segment_size == 'ceil':
  81. segment_size = int(math.ceil(audio_length / 5)) * 5
  82. segments_needed = int(round(float(audio_length) / float(segment_size)))
  83. segments_needed = int(round(segments_needed * 0.8))
  84. except Exception as ex:
  85. #log.error('[process_segment] file {} is not an mp3'.format(filename))
  86. log.error(str(ex))
  87. return
  88. dejavu = Dejavu({"database_type": "mem"})
  89. try:
  90. dejavu.fingerprint_file(filename)
  91. except Exception as ex:
  92. log.error('[process_segment] cannot fingerprint: {}'.format(ex))
  93. # 2. Read the list of files from local database
  94. audios_counter = 0
  95. results = []
  96. v = []
  97. audios_iterable = []
  98. for path, name, ts in audios_iterable:
  99. audios_counter += os.path.isfile(path)
  100. values = []
  101. try:
  102. for match in dejavu.recognize(recognizer, path, segment_size):
  103. name = None
  104. ad = None
  105. results.append({
  106. 'ad': ad,
  107. 'confidence': match['confidence'],
  108. 'timestamp': ts,
  109. 'offset': match['offset'],
  110. 'name': name
  111. })
  112. values.append(str(match['confidence']))
  113. ts += match['length'] / 1000
  114. v.append(','.join(values))
  115. log.info('[process_segment] {0}) {1}'.format(
  116. os.path.split(path)[-1],
  117. ','.join(values),
  118. ))
  119. except CouldntDecodeError as ex:
  120. log.error('[process_segment] {}'.format(ex))
  121. try:
  122. encontrados = {}
  123. for i in item_ids:
  124. r = [result for result in results if result["name"] == i]
  125. encontrados[i] = find_repetitions(r, segments_needed=segments_needed, calibration=calibration,)
  126. #for id in encontrados:
  127. # for e in encontrados[id]:
  128. # for i in item['elementos']:
  129. # if i['id'] == id and i['anuncio'] == e['ad']:
  130. # if 'encontrados' not in i:
  131. # i['encontrados'] = []
  132. # i['encontrados'].append(e)
  133. # break
  134. #item["archivos_perdidos"] = (12 - audios_counter) if audios_counter < 12 else 0
  135. except ConnectionError as ex:
  136. log.error('[process_segment] {}'.format(str(ex)))
  137. except UserWarning as warn:
  138. log.warning(str(warn))
  139. def find_repetitions(results, segments_needed=2, calibration=None):
  140. found_counter = 0
  141. found_down_counter = 0
  142. found_index = None
  143. expect_space = False
  144. expect_recover = False
  145. last_value_in_threshold_index = -1
  146. fall_tolerance = calibration['fallTolerance']
  147. found = []
  148. high = 100 # Obtener este valor desde un parámetro
  149. middle_high = 50 # Obtener este valor desde un parámetro
  150. segment_middle_needed = 2 # Obtener este valor desde un parámetro
  151. found_high = None
  152. found_middle_high = []
  153. if threshold_mode == THRESHOLD_FIXED:
  154. threshold = calibration['threshold']
  155. elif threshold_mode == THRESHOLD_AVERAGE:
  156. values = [x['confidence'] for x in results]
  157. threshold = math.ceil(float(sum(values)) / float(len(values)))
  158. if segments_needed < 1:
  159. segments_needed = 1
  160. for index, result in enumerate(results):
  161. #if result['confidence'] >= high:
  162. # if found_high is None:
  163. # found_high = index
  164. # elif result['confidence'] > results[found_high]['confidence']:
  165. # found_high = index
  166. #elif result['confidence'] >= middle_high:
  167. # found_middle_high.append(index)
  168. if not expect_space:
  169. if result['confidence'] >= threshold:
  170. found_counter += 1
  171. last_value_in_threshold_index = index
  172. if found_index is None:
  173. found_index = index
  174. if expect_recover:
  175. found_counter += found_down_counter
  176. expect_recover = False
  177. elif fall_tolerance:
  178. if not expect_recover:
  179. if last_value_in_threshold_index != -1:
  180. """ Solo cuando ya haya entrado por lo menos
  181. un valor en el rango del threshold, es cuando
  182. se podrá esperar un valor bajo """
  183. expect_recover = True
  184. found_down_counter += 1
  185. else:
  186. pass
  187. else:
  188. """ Si después de haber pasado tolerado 1 elemento
  189. vuelve a salir otro fuera del threshold continuo,
  190. entonces ya se da por perdido """
  191. found_counter = 0
  192. found_down_counter = 0
  193. found_index = None
  194. expect_recover = False
  195. else:
  196. found_counter = 0
  197. found_down_counter = 0
  198. found_index = None
  199. expect_recover = False
  200. # Aquí veremos si hay un valor alto
  201. #if found_high is not None:
  202. # found_row = results[found_high]
  203. # found.append(found_row)
  204. #elif len(found_middle_high) >= segment_middle_needed:
  205. # found_row = results[found_middle_high[0]]
  206. # found.append(found_row)
  207. #found_high = None
  208. #found_middle_high = []
  209. else:
  210. if result['confidence'] <= threshold:
  211. expect_space = False
  212. if found_counter >= segments_needed:
  213. found_row = results[found_index]
  214. found.append(found_row)
  215. found_counter = 0
  216. expect_space = True
  217. #found_high = None
  218. #found_middle_high = []
  219. return found
  220. app = setup_endpoint(queue=queue)
  221. loop = IOLoop.current()
  222. loop.add_callback(process_segment)
  223. if __name__ == '__main__':
  224. try:
  225. log.info('Starting ondemand service')
  226. loop.start()
  227. except KeyboardInterrupt:
  228. log.error('Process killed')