service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. # -*- coding: utf8 -*-
  2. from __future__ import print_function, absolute_import
  3. from tornado.ioloop import IOLoop
  4. from tornado.web import Application
  5. from fourier.api.client import Client, ConnectionError
  6. from fourier.boxconfig import parse_config
  7. from fourier.dejavu.recognize import FilePerSecondRecognizer
  8. from datetime import datetime, timedelta
  9. from ondemand.endpoint import setup_endpoint
  10. from fourier.dejavu import Dejavu
  11. from firebase_admin import credentials
  12. from firebase_admin import db as fbdb
  13. from binascii import hexlify
  14. from base64 import b64decode
  15. from threading import Thread
  16. from multiprocessing import Process
  17. from argparse import ArgumentParser
  18. import logging as log
  19. import firebase_admin
  20. import mutagen.mp3
  21. import requests
  22. import dateutil
  23. import sqlite3
  24. import math
  25. import time
  26. import sys
  27. import os
  28. if sys.version_info >= (3, 0):
  29. from queue import Queue, Empty
  30. else:
  31. from Queue import Queue, Empty
  32. log.basicConfig(format='[%(asctime)s] %(message)s', level=log.INFO)
  33. AUDIOS_PATH = '/tmp'
  34. AHEAD_TIME_AUDIO_TOLERANCE = 2 # second
  35. MAX_SEGMENT_THREADS = 4
  36. THRESHOLD = 10
  37. SEGMENTS_TOLERANCE_RATE = 0.6
  38. FALL_TOLERANCE_SEGMENTS = 1
  39. # THRESHOLD
  40. THRESHOLD_FIXED = 1
  41. THRESHOLD_AVERAGE = 2
  42. # Modos de procesamiento de queue
  43. #  - QUEQUE_SINGLE: procesa solo un segmento a la vez
  44. # - QUEUE_THREAD: inicia un hilo para cada segmento
  45. # Por default se usará el threaded.
  46. # TOOD: hacerlo configurable por medio de argumentos
  47. # de ejecución.
  48. QUEUE_SINGLE = 1
  49. QUEUE_THREAD = 2
  50. # Se pueden usar diferentes API's
  51. # la de threading y la de multiprocessing.
  52. MultiAPI = Process
  53. config = parse_config()
  54. queue = Queue()
  55. client = Client(config['device_id'],
  56. config['apiSecret'])
  57. cloud_base_url = 'https://storage.googleapis.com/{}'\
  58. .format(config['bucket'])
  59. base_path = config.get("basepath", "/var/fourier")
  60. fb_credentials = credentials.Certificate('/etc/Fourier-key.json')
  61. firebase_admin.initialize_app(fb_credentials, config['firebase'])
  62. dejavu = Dejavu({"database_type":"mem"})
  63. device_id = config['device_id']
  64. device_path = os.path.join(base_path, device_id)
  65. recognizer = FilePerSecondRecognizer
  66. calibrations = {}
  67. device_ref = fbdb.reference('devices').child(config['device_id'])
  68. calibrations_ref = device_ref.child('calibrations')
  69. # settings
  70. queue_mode = QUEUE_THREAD
  71. threshold_mode = THRESHOLD_FIXED
  72. db_path = config.get('localDatabase', os.path.join(device_path, 'files.db'))
  73. db = sqlite3.connect(db_path)
  74. cloud_cache = {}
  75. def feed_queue():
  76. """ Search for pending scheduled work in
  77. server and add them to a memory queue. """
  78. try:
  79. response = client.get_schedule_pending()
  80. downloaded_counter = len(response['items'])
  81. for item in response['items']:
  82. queue.put(item)
  83. if downloaded_counter:
  84. log.info(('[feed_queue] {} new '
  85. + 'pending schedule items.')\
  86. .format(downloaded_counter)
  87. )
  88. if queue.qsize() > 0:
  89. if queue_mode == QUEUE_THREAD:
  90. loop.add_callback(process_queue_with_threads)
  91. else:
  92. loop.add_callback(process_queue)
  93. else:
  94. loop.add_timeout(time.time() + 30, feed_queue)
  95. except ConnectionError as ex:
  96. log.error('[feed_queue] cannot feed: {}, retryig later'.format(ex))
  97. loop.add_timeout(time.time() + 15, feed_queue)
  98. except Exception as ex:
  99. """ Errores desconocidos """
  100. log.error('[feed_queue] {}'.format(ex))
  101. loop.add_timeout(time.time() + 60, feed_queue)
  102. def process_queue():
  103. """ Try to the next item in a queue and start
  104. processing it accordingly. If success, repeat
  105. the function or go to feed if no more items. """
  106. try:
  107. item = queue.get(False)
  108. process_segment(item)
  109. loop.add_callback(process_queue)
  110. except Empty:
  111. loop.add_callback(feed_queue)
  112. except Exception as ex:
  113. log.error(ex)
  114. loop.add_callback(process_queue)
  115. def process_queue_with_threads():
  116. threads = [None] * MAX_SEGMENT_THREADS
  117. is_drained = False
  118. log.info('Starting thread processing')
  119. while True:
  120. for index, t in enumerate(threads):
  121. if not t:
  122. try:
  123. item = queue.get(False)
  124. station = item['station']
  125. date = dateutil.parser.parse(item['date'])
  126. thread = MultiAPI(target=process_segment,
  127. args=(item,),
  128. kwargs={
  129. 'audios': [f for f in iterate_audios(date, station)]
  130. }
  131. )
  132. threads[index] = thread
  133. thread.start()
  134. except Empty:
  135. is_drained = True
  136. elif not t.is_alive():
  137. threads[index] = None
  138. if is_drained:
  139. if threads.count(None) == MAX_SEGMENT_THREADS:
  140. break
  141. log.info('Finished thread processing')
  142. loop.add_callback(feed_queue)
  143. def process_segment(item, audios=None):
  144. """ Procesa una hora de audio """
  145. station = item['station']
  146. date = dateutil.parser.parse(item['date'])
  147. log.info('processing segment: {}'.format(item))
  148. # 0. Obtener la información de calibración
  149. calibration = calibrations_ref.child(station)
  150. print(calibration)
  151. # 1. obtener el audio desde firebase
  152. # y calcular su fingerprint.
  153. filename, md5hash = cloud_download(ad_key=item['ad'])
  154. if not filename:
  155. log.info('ad file missing')
  156. return
  157. # 1.1 Calcular el número de segmentos requeridos
  158. # de acuerdo a la duración total del audio.
  159. try:
  160. audio = mutagen.mp3.MP3(filename)
  161. segments_needed = int(round(float(audio.info.length) / float(5)))
  162. segments_needed = int(round(
  163. segments_needed * SEGMENTS_TOLERANCE_RATE
  164. ))
  165. except Exception as ex:
  166. log.error('file {} is not an mp3'.format(filename))
  167. log.error(str(ex))
  168. return
  169. try:
  170. dejavu.fingerprint_file(filename)
  171. except Exception as ex:
  172. log.error('cannot fingerprint: {}'.format(ex))
  173. """ Hay dos posibles escensarios al obtener los audios
  174. a. Los audios vienen por el parámetro "audios" de la
  175. función, siendo esta una lista.
  176. b. Los audios se obtienen directamente de la base
  177. de datos en modo de cursor.
  178. """
  179. audios_iterable = audios if audios \
  180. else iterate_audios(date, station)
  181. # 2. Read the list of files from local database
  182. audios_counter = 0
  183. results = []
  184. try:
  185. for path, name, ts in audios_iterable:
  186. log.info('file: {}'.format(path))
  187. audios_counter += os.path.isfile(path)
  188. values = []
  189. for match in dejavu.recognize(recognizer, path, 5,
  190. ads_filter=[md5hash]):
  191. results.append({
  192. 'confidence': match['confidence'],
  193. 'timestamp': ts,
  194. 'offset': match['offset']
  195. })
  196. values.append(str(match['confidence']))
  197. ts += match['length'] / 1000
  198. log.info('{2} ({3}/{0}) {1}'.format(
  199. os.path.split(path)[-1],
  200. ','.join(values),
  201. item['ad'],
  202. station,
  203. ))
  204. except dejavu.CouldntDecodeError as ex:
  205. log.error('[process_segment] {}'.format(ex))
  206. try:
  207. response = client.put_schedule_results(
  208. item['schedule'],
  209. item['id'],
  210. None, # TODO: send results again
  211. found=find_repetitions(results,
  212. segments_needed=segments_needed
  213. ),
  214. missing_files=(12 - audios_counter) \
  215. if audios_counter < 12 else 0
  216. )
  217. log.info('API response: {}'.format(response))
  218. except ConnectionError as ex:
  219. log.error(str(ex))
  220. except UserWarning as warn:
  221. log.warning(str(warn))
  222. def find_repetitions(results, segments_needed=2):
  223. found_counter = 0
  224. found_index = None
  225. expect_space = False
  226. expect_recover = False
  227. last_value_in_threshold_index = -1
  228. found = []
  229. if threshold_mode == THRESHOLD_FIXED:
  230. threshold = THRESHOLD
  231. elif threshold_mode == THRESHOLD_AVERAGE:
  232. values = [x['confidence'] for x in results]
  233. threshold = math.ceil(float(sum(values)) / float(len(values)))
  234. if segments_needed < 1:
  235. segments_needed = 1
  236. for index, result in enumerate(results):
  237. if not expect_space:
  238. if result['confidence'] >= threshold:
  239. found_counter += 1
  240. last_value_in_threshold_index = index
  241. if found_index is None:
  242. found_index = index
  243. if expect_recover:
  244. expect_recover = False
  245. elif FALL_TOLERANCE_SEGMENTS:
  246. if not expect_recover:
  247. if last_value_in_threshold_index != -1:
  248. """ Solo cuando ya haya entrado por lo menos
  249. un valor en el rango del threshold, es cuando
  250. se podrá esperar un valor bajo """
  251. expect_recover = True
  252. found_counter += 1
  253. else:
  254. pass
  255. else:
  256. """ Si después de haber pasado tolerado 1 elemento
  257. vuelve a salir otro fuera del threshold continuo,
  258. entonces ya se da por perdido """
  259. found_counter = 0
  260. found_index = None
  261. expect_recover = False
  262. else:
  263. found_counter = 0
  264. found_index = None
  265. expect_recover = False
  266. else:
  267. if result['confidence'] <= threshold:
  268. expect_space = False
  269. if found_counter >= segments_needed:
  270. found.append(results[found_index]['timestamp'])
  271. found_counter = 0
  272. expect_space = True
  273. return found
  274. def iterate_audios(dt, station):
  275. """ Given a datetime object and an station,
  276. iterate a list of files that are between
  277. the the date and itself plus 5 minutes;
  278. station must match too """
  279. from_time = time.mktime(dt.timetuple()) \
  280. - AHEAD_TIME_AUDIO_TOLERANCE
  281. to_time = from_time + 3599 + AHEAD_TIME_AUDIO_TOLERANCE
  282. log.info('from {} to {}'.format(int(from_time), int(to_time)))
  283. cursor = db.cursor()
  284. cursor.execute((
  285. 'select "filename", "timestamp" '
  286. 'from "file" '
  287. 'where "timestamp" between ? and ? '
  288. 'and "station" = ? '
  289. 'order by "timestamp" asc'
  290. ),
  291. (from_time, to_time, station, ),
  292. )
  293. files = [file for file in cursor]
  294. cursor.close()
  295. for mp3 in files:
  296. mp3path, ts = mp3
  297. mp3name = os.path.basename(mp3path)
  298. yield (mp3path, mp3name, ts)
  299. def cloud_download(ad_key=None):
  300. """ Given an ad key, the file is downloaded to
  301. the system temporal folder to be processed """
  302. if ad_key in cloud_cache:
  303. """ If this file has already been downloaded,
  304. will not be downloaded again, instead will
  305. be taken from cloud_cache dictionary """
  306. filename, md5hash = cloud_cache[ad_key]
  307. if os.path.isfile(filename):
  308. return filename, md5hash
  309. ad = fbdb.reference('ads/{}'.format(ad_key)).get()
  310. filename = os.path.basename(ad['path'])
  311. out_file = os.path.join(AUDIOS_PATH, filename)
  312. url = '{}/{}'.format(cloud_base_url, ad['path'])
  313. response = requests.get(url)
  314. if response.status_code == 200:
  315. hashes = response.headers['x-goog-hash']
  316. hashes = hashes.split(',')
  317. hashes = [h.split('=', 1) for h in hashes]
  318. hashes = {h[0].strip(): hexlify(b64decode(h[1])) for h in hashes}
  319. md5sum = hashes['md5']
  320. with open(out_file, "wb") as fp:
  321. fp.write(response.content)
  322. tp = (out_file, md5sum,)
  323. cloud_cache[ad_key] = tp
  324. return tp
  325. app = setup_endpoint(queue=queue)
  326. loop = IOLoop.current()
  327. loop.add_callback(feed_queue)
  328. if __name__ == '__main__':
  329. try:
  330. log.info('Starting ondemand service')
  331. loop.start()
  332. except KeyboardInterrupt:
  333. log.error('Process killed')