service.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. # -*- coding: utf8 -*-
  2. from __future__ import print_function, absolute_import
  3. from tornado.ioloop import IOLoop
  4. from tornado.web import Application
  5. from fourier.api.client import Client, ConnectionError
  6. from fourier.boxconfig import parse_config
  7. from fourier.dejavu.recognize import FilePerSecondRecognizer
  8. from datetime import datetime, timedelta
  9. from ondemand.endpoint import setup_endpoint
  10. from fourier.dejavu import Dejavu
  11. from Queue import Queue, Empty
  12. from firebase_admin import credentials
  13. from firebase_admin import db as fbdb
  14. from binascii import hexlify
  15. from base64 import b64decode
  16. import firebase_admin
  17. import requests
  18. import dateutil
  19. import sqlite3
  20. import time
  21. import sys
  22. import os
  23. AUDIOS_PATH = '/tmp'
  24. AHEAD_TIME_AUDIO_TOLERANCE = 2 # second
  25. config = parse_config()
  26. queue = Queue()
  27. client = Client(config['device_id'],
  28. config['apiSecret'])
  29. cloud_base_url = 'https://storage.googleapis.com/{}'\
  30. .format(config['bucket'])
  31. base_path = config.get("basepath", "/var/fourier")
  32. fb_credentials = credentials.Certificate('/etc/Fourier-key.json')
  33. firebase_admin.initialize_app(fb_credentials, config['firebase'])
  34. dejavu = Dejavu({"database_type":"mem"})
  35. device_id = config['device_id']
  36. device_path = os.path.join(base_path, device_id)
  37. recognizer = FilePerSecondRecognizer
  38. db_path = os.path.join(device_path, 'files.db')
  39. db = sqlite3.connect(db_path)
  40. cloud_cache = {}
  41. def feed_queue():
  42. """ Search for pending scheduled work in
  43. server and add them to a memory queue. """
  44. try:
  45. response = client.get_schedule_pending()
  46. for item in response['items']:
  47. queue.put(item)
  48. if queue.qsize() > 0:
  49. loop.add_callback(process_queue)
  50. else:
  51. loop.add_timeout(time.time() + 30, feed_queue)
  52. except ConnectionError as ex:
  53. print('cannot feed: {}, retryig later'.format(ex),
  54. file=sys.stderr)
  55. loop.add_timeout(time.time() + 15, feed_queue)
  56. except Exception as ex:
  57. """ Errores desconocidos """
  58. print(ex, file=sys.stderr)
  59. loop.add_timeout(time.time() + 60, feed_queue)
  60. def process_queue():
  61. """ Try to the next item in a queue and start
  62. processing it accordingly. If success, repeat
  63. the function or go to feed if no more items. """
  64. try:
  65. item = queue.get(False)
  66. process_segment(item)
  67. loop.add_callback(process_queue)
  68. except Empty:
  69. loop.add_callback(feed_queue)
  70. except Exception as ex:
  71. print(str(ex), file=sys.stderr)
  72. loop.add_callback(process_queue)
  73. def process_segment(item):
  74. """ Procesa una hora de audio """
  75. station = item['station']
  76. date = dateutil.parser.parse(item['date'])
  77. # 1. obtener el audio desde firebase
  78. # y calcular su fingerprint.
  79. filename, md5hash = cloud_download(ad_key=item['ad'])
  80. if not filename:
  81. print('ad file missing', file=sys.stderr)
  82. return
  83. try:
  84. dejavu.fingerprint_file(filename)
  85. except Exception as ex:
  86. print('cannot fingerprint: {}'.format(ex))
  87. # 2. Read the list of files from local database
  88. audios_counter = 0
  89. results = []
  90. for path, name, ts in iterate_audios(date, station):
  91. print(path)
  92. audios_counter += os.path.isfile(path)
  93. for match in dejavu.recognize(recognizer, path, 5,
  94. ads_filter=[md5hash]):
  95. try:
  96. results.append({
  97. 'confidence': match['confidence'],
  98. 'timestamp': ts,
  99. 'offset': match['offset']
  100. })
  101. print("{} {}".format(ts, match['confidence']))
  102. except KeyError as ex:
  103. print(ex, file=sys.stderr)
  104. ts += match['length'] / 1000
  105. try:
  106. response = client.put_schedule_results(
  107. item['schedule'],
  108. item['id'],
  109. None, # TODO: send results again
  110. found=find_repetitions(results),
  111. missing_files=(12 - audios_counter) \
  112. if audios_counter < 12 else 0
  113. )
  114. print("api response: {}".format(response))
  115. except ConnectionError as ex:
  116. print(ex, file=sys.stderr)
  117. except UserWarning as warn:
  118. print(str(warn), file=sys.stderr)
  119. def find_repetitions(results):
  120. found_counter = 0
  121. found_index = None
  122. segments_needed = 2
  123. seconds_needed = 9
  124. threshold = 20
  125. expect_space = False
  126. found = []
  127. for index, result in enumerate(results):
  128. if not expect_space:
  129. if result['confidence'] > threshold:
  130. found_counter += 1
  131. if found_index is None:
  132. found_index = index
  133. else:
  134. found_counter = 0
  135. found_index = None
  136. else:
  137. if result['confidence'] <= threshold:
  138. expect_space = False
  139. if found_counter >= segments_needed:
  140. found.append(results[found_index]['timestamp'])
  141. found_counter = 0
  142. expect_space = True
  143. return found
  144. def iterate_audios(dt, station):
  145. """ Given a datetime object and an station,
  146. iterate a list of files that are between
  147. the the date and itself plus 5 minutes;
  148. station must match too """
  149. from_time = time.mktime(dt.timetuple()) \
  150. - AHEAD_TIME_AUDIO_TOLERANCE
  151. to_time = from_time + 3599 + AHEAD_TIME_AUDIO_TOLERANCE
  152. print('from {} to {}'.format(from_time, to_time))
  153. cursor = db.cursor()
  154. cursor.execute((
  155. 'select "filename", "timestamp" '
  156. 'from "file" '
  157. 'where "timestamp" between ? and ? '
  158. 'and "station" = ? '
  159. 'order by "timestamp" asc'
  160. ),
  161. (from_time, to_time, station, ),
  162. )
  163. files = [file for file in cursor]
  164. cursor.close()
  165. for mp3 in files:
  166. mp3path, ts = mp3
  167. mp3name = os.path.basename(mp3path)
  168. yield (mp3path, mp3name, ts)
  169. def cloud_download(ad_key=None):
  170. """ Given an ad key, the file is downloaded to
  171. the system temporal folder to be processed """
  172. if ad_key in cloud_cache:
  173. """ If this file has already been downloaded,
  174. will not be downloaded again, instead will
  175. be taken from cloud_cache dictionary """
  176. filename, md5hash = cloud_cache[ad_key]
  177. if os.path.isfile(filename):
  178. return filename, md5hash
  179. ad = fbdb.reference('ads/{}'.format(ad_key)).get()
  180. filename = os.path.basename(ad['path'])
  181. out_file = os.path.join(AUDIOS_PATH, filename)
  182. url = '{}/{}'.format(cloud_base_url, ad['path'])
  183. response = requests.get(url)
  184. if response.status_code == 200:
  185. hashes = response.headers['x-goog-hash']
  186. hashes = hashes.split(',')
  187. hashes = [h.split('=', 1) for h in hashes]
  188. hashes = {h[0].strip(): hexlify(b64decode(h[1])) for h in hashes}
  189. md5sum = hashes['md5']
  190. with open(out_file, "wb") as fp:
  191. fp.write(response.content)
  192. tp = (out_file, md5sum)
  193. cloud_cache[ad_key] = tp
  194. return tp
  195. app = setup_endpoint(queue=queue)
  196. loop = IOLoop.current()
  197. loop.add_callback(feed_queue)
  198. if __name__ == '__main__':
  199. try:
  200. loop.start()
  201. except KeyboardInterrupt:
  202. print('process killed')