service.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. # -*- coding: utf8 -*-
  2. from __future__ import print_function, absolute_import
  3. from tornado.ioloop import IOLoop
  4. from tornado.web import Application
  5. from fourier.api.client import Client, ConnectionError
  6. from fourier.boxconfig import parse_config
  7. from fourier.dejavu.recognize import FilePerSecondRecognizer
  8. from datetime import datetime, timedelta
  9. from ondemand.endpoint import setup_endpoint
  10. from fourier.dejavu import Dejavu
  11. from Queue import Queue, Empty
  12. from firebase_admin import credentials
  13. from firebase_admin import db as fbdb
  14. from binascii import hexlify
  15. from base64 import b64decode
  16. from threading import Thread
  17. from argparse import ArgumentParser
  18. import logging as log
  19. import firebase_admin
  20. import mutagen.mp3
  21. import requests
  22. import dateutil
  23. import sqlite3
  24. import time
  25. import sys
  26. import os
  27. log.basicConfig(format='[%(asctime)s] %(message)s', level=log.INFO)
  28. AUDIOS_PATH = '/tmp'
  29. AHEAD_TIME_AUDIO_TOLERANCE = 2 # second
  30. MAX_SEGMENT_THREADS = 2
  31. # Modos de procesamiento de queue
  32. #  - QUEQUE_SINGLE: procesa solo un segmento a la vez
  33. # - QUEUE_THREAD: inicia un hilo para cada segmento
  34. # Por default se usará el threaded.
  35. # TOOD: hacerlo configurable por medio de argumentos
  36. # de ejecución.
  37. QUEUE_SINGLE = 1
  38. QUEUE_THREAD = 2
  39. config = parse_config()
  40. queue = Queue()
  41. client = Client(config['device_id'],
  42. config['apiSecret'])
  43. cloud_base_url = 'https://storage.googleapis.com/{}'\
  44. .format(config['bucket'])
  45. base_path = config.get("basepath", "/var/fourier")
  46. fb_credentials = credentials.Certificate('/etc/Fourier-key.json')
  47. firebase_admin.initialize_app(fb_credentials, config['firebase'])
  48. dejavu = Dejavu({"database_type":"mem"})
  49. device_id = config['device_id']
  50. device_path = os.path.join(base_path, device_id)
  51. recognizer = FilePerSecondRecognizer
  52. queue_mode = QUEUE_THREAD
  53. db_path = config.get('localDatabase', os.path.join(device_path, 'files.db'))
  54. db = sqlite3.connect(db_path)
  55. cloud_cache = {}
  56. def feed_queue():
  57. """ Search for pending scheduled work in
  58. server and add them to a memory queue. """
  59. try:
  60. response = client.get_schedule_pending()
  61. downloaded_counter = len(response['items'])
  62. for item in response['items']:
  63. queue.put(item)
  64. if downloaded_counter:
  65. log.info(('[feed_queue] {} new '
  66. + 'pending schedule items.')\
  67. .format(downloaded_counter)
  68. )
  69. if queue.qsize() > 0:
  70. if queue_mode == QUEUE_THREAD:
  71. loop.add_callback(process_queue_with_threads)
  72. else:
  73. loop.add_callback(process_queue)
  74. else:
  75. loop.add_timeout(time.time() + 30, feed_queue)
  76. except ConnectionError as ex:
  77. log.error('[feed_queue] cannot feed: {}, retryig later'.format(ex))
  78. loop.add_timeout(time.time() + 15, feed_queue)
  79. except Exception as ex:
  80. """ Errores desconocidos """
  81. log.error('[feed_queue] {}'.format(ex))
  82. loop.add_timeout(time.time() + 60, feed_queue)
  83. def process_queue():
  84. """ Try to the next item in a queue and start
  85. processing it accordingly. If success, repeat
  86. the function or go to feed if no more items. """
  87. try:
  88. item = queue.get(False)
  89. process_segment(item)
  90. loop.add_callback(process_queue)
  91. except Empty:
  92. loop.add_callback(feed_queue)
  93. except Exception as ex:
  94. log.error(ex)
  95. loop.add_callback(process_queue)
  96. def process_queue_with_threads():
  97. threads = [None] * MAX_SEGMENT_THREADS
  98. is_drained = False
  99. log.info('Starting thread processing')
  100. while True:
  101. for index, t in enumerate(threads):
  102. if not t:
  103. try:
  104. item = queue.get(False)
  105. thread = Thread(target=process_segment, args=(item,))
  106. threads[index] = thread
  107. thread.start()
  108. except Empty:
  109. is_drained = True
  110. elif not t.is_alive():
  111. threads[index] = None
  112. if is_drained:
  113. if threads.count(None) == MAX_SEGMENT_THREADS:
  114. break
  115. log.info('Finished thread processing')
  116. loop.add_callback(feed_queue)
  117. def process_segment(item):
  118. """ Procesa una hora de audio """
  119. station = item['station']
  120. date = dateutil.parser.parse(item['date'])
  121. log.info('processing segment: {}'.format(item))
  122. # 1. obtener el audio desde firebase
  123. # y calcular su fingerprint.
  124. filename, md5hash = cloud_download(ad_key=item['ad'])
  125. if not filename:
  126. log.info('ad file missing')
  127. return
  128. # 1.1 Calcular el número de segmentos requeridos
  129. # de acuerdo a la duración total del audio.
  130. try:
  131. audio = mutagen.mp3.MP3(filename)
  132. segments_needed = int(round(float(audio.info.length) / float(5)))
  133. except Exception as ex:
  134. log.error('file {} is not an mp3'.format(audio))
  135. log.error(str(ex))
  136. return
  137. try:
  138. dejavu.fingerprint_file(filename)
  139. except Exception as ex:
  140. log.error('cannot fingerprint: {}'.format(ex))
  141. # 2. Read the list of files from local database
  142. audios_counter = 0
  143. results = []
  144. for path, name, ts in iterate_audios(date, station):
  145. log.info('file: {}'.format(path))
  146. audios_counter += os.path.isfile(path)
  147. for match in dejavu.recognize(recognizer, path, 5,
  148. ads_filter=[md5hash]):
  149. try:
  150. results.append({
  151. 'confidence': match['confidence'],
  152. 'timestamp': ts,
  153. 'offset': match['offset']
  154. })
  155. log.info("{} {}".format(ts, match['confidence']))
  156. except KeyError as ex:
  157. log.error(str(ex))
  158. ts += match['length'] / 1000
  159. try:
  160. response = client.put_schedule_results(
  161. item['schedule'],
  162. item['id'],
  163. None, # TODO: send results again
  164. found=find_repetitions(results,
  165. segments_needed=segments_needed
  166. ),
  167. missing_files=(12 - audios_counter) \
  168. if audios_counter < 12 else 0
  169. )
  170. log.info('API response: {}'.format(response))
  171. except ConnectionError as ex:
  172. log.error(str(ex))
  173. except UserWarning as warn:
  174. log.warning(str(warn))
  175. def find_repetitions(results, segments_needed=2):
  176. found_counter = 0
  177. found_index = None
  178. seconds_needed = 9
  179. threshold = 20
  180. expect_space = False
  181. found = []
  182. if segments_needed < 1:
  183. segments_needed = 1
  184. for index, result in enumerate(results):
  185. if not expect_space:
  186. if result['confidence'] > threshold:
  187. found_counter += 1
  188. if found_index is None:
  189. found_index = index
  190. else:
  191. found_counter = 0
  192. found_index = None
  193. else:
  194. if result['confidence'] <= threshold:
  195. expect_space = False
  196. if found_counter >= segments_needed:
  197. found.append(results[found_index]['timestamp'])
  198. found_counter = 0
  199. expect_space = True
  200. return found
  201. def iterate_audios(dt, station):
  202. """ Given a datetime object and an station,
  203. iterate a list of files that are between
  204. the the date and itself plus 5 minutes;
  205. station must match too """
  206. from_time = time.mktime(dt.timetuple()) \
  207. - AHEAD_TIME_AUDIO_TOLERANCE
  208. to_time = from_time + 3599 + AHEAD_TIME_AUDIO_TOLERANCE
  209. log.info('from {} to {}'.format(int(from_time), int(to_time)))
  210. cursor = db.cursor()
  211. cursor.execute((
  212. 'select "filename", "timestamp" '
  213. 'from "file" '
  214. 'where "timestamp" between ? and ? '
  215. 'and "station" = ? '
  216. 'order by "timestamp" asc'
  217. ),
  218. (from_time, to_time, station, ),
  219. )
  220. files = [file for file in cursor]
  221. cursor.close()
  222. for mp3 in files:
  223. mp3path, ts = mp3
  224. mp3name = os.path.basename(mp3path)
  225. yield (mp3path, mp3name, ts)
  226. def cloud_download(ad_key=None):
  227. """ Given an ad key, the file is downloaded to
  228. the system temporal folder to be processed """
  229. if ad_key in cloud_cache:
  230. """ If this file has already been downloaded,
  231. will not be downloaded again, instead will
  232. be taken from cloud_cache dictionary """
  233. filename, md5hash = cloud_cache[ad_key]
  234. if os.path.isfile(filename):
  235. return filename, md5hash
  236. ad = fbdb.reference('ads/{}'.format(ad_key)).get()
  237. filename = os.path.basename(ad['path'])
  238. out_file = os.path.join(AUDIOS_PATH, filename)
  239. url = '{}/{}'.format(cloud_base_url, ad['path'])
  240. response = requests.get(url)
  241. if response.status_code == 200:
  242. hashes = response.headers['x-goog-hash']
  243. hashes = hashes.split(',')
  244. hashes = [h.split('=', 1) for h in hashes]
  245. hashes = {h[0].strip(): hexlify(b64decode(h[1])) for h in hashes}
  246. md5sum = hashes['md5']
  247. with open(out_file, "wb") as fp:
  248. fp.write(response.content)
  249. tp = (out_file, md5sum,)
  250. cloud_cache[ad_key] = tp
  251. return tp
  252. app = setup_endpoint(queue=queue)
  253. loop = IOLoop.current()
  254. loop.add_callback(feed_queue)
  255. if __name__ == '__main__':
  256. try:
  257. log.info('Starting ondemand service')
  258. loop.start()
  259. except KeyboardInterrupt:
  260. log.error('Process killed')