service.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. # -*- coding: utf8 -*-
  2. from __future__ import print_function
  3. from tornado.ioloop import IOLoop
  4. from tornado.web import Application
  5. from fourier.api.client import Client, ConnectionError
  6. from fourier.boxconfig import parse_config
  7. from fourier.dejavu.recognize import FilePerSecondRecognizer
  8. from datetime import datetime, timedelta
  9. from fourier.dejavu import Dejavu
  10. from Queue import Queue, Empty
  11. from firebase_admin import credentials
  12. from firebase_admin import db as fbdb
  13. import firebase_admin
  14. import requests
  15. import dateutil
  16. import sqlite3
  17. import time
  18. import sys
  19. import os
  20. AUDIOS_PATH = '/tmp'
  21. config = parse_config()
  22. queue = Queue()
  23. client = Client(config['device_id'],
  24. config['apiSecret'])
  25. cloud_base_url = 'https://storage.googleapis.com/{}'\
  26. .format(config['bucket'])
  27. base_path = config.get("basepath", "/var/fourier")
  28. fb_credentials = credentials.Certificate('/etc/Fourier-key.json')
  29. firebase_admin.initialize_app(fb_credentials, config['firebase'])
  30. dejavu = Dejavu({"database_type":"mem"})
  31. device_id = config['device_id']
  32. device_path = os.path.join(base_path, device_id)
  33. recognizer = FilePerSecondRecognizer
  34. db_path = os.path.join(device_path, 'files.db')
  35. db = sqlite3.connect(db_path)
  36. def feed_queue():
  37. """ Search for pending scheduled work in
  38. server and add them to a memory queue. """
  39. try:
  40. response = client.get_schedule_pending()
  41. for item in response['items']:
  42. queue.put(item)
  43. if queue.qsize() > 0:
  44. loop.add_callback(process_queue)
  45. else:
  46. loop.add_timeout(time.time() + 30, feed_queue)
  47. except ConnectionError as ex:
  48. print('cannot feed: {}, retryig later'.format(ex),
  49. file=sys.stderr)
  50. loop.add_timeout(time.time() + 15, feed_queue)
  51. except Exception as ex:
  52. """ Errores desconocidos """
  53. print(ex, file=sys.stderr)
  54. loop.add_timeout(time.time() + 60, feed_queue)
  55. def process_queue():
  56. """ Try to the next item in a queue and start
  57. processing it accordingly. If success, repeat
  58. the function or go to feed if no more items. """
  59. try:
  60. item = queue.get(False)
  61. process_segment(item)
  62. loop.add_callback(process_queue)
  63. except Empty:
  64. loop.add_callback(feed_queue)
  65. except Exception as ex:
  66. print(str(ex), file=sys.stderr)
  67. loop.add_callback(process_queue)
  68. def process_segment(item):
  69. """ Procesa una hora de audio """
  70. station = item['station']
  71. date = dateutil.parser.parse(item['date'])
  72. # 1. obtener el audio desde firebase
  73. # y calcular su fingerprint.
  74. filename = cloud_download(ad_key=item['ad'])
  75. if not filename:
  76. print('ad file missing', file=sys.stderr)
  77. return
  78. try:
  79. dejavu.fingerprint_file(filename)
  80. except Exception as ex:
  81. print('cannot fingerprint: {}'.format(ex))
  82. # 2. Read the list of files from local database
  83. results = []
  84. for path, name, ts in iterate_audios(date, station):
  85. print(path)
  86. for match in dejavu.recognize(recognizer, path, 5):
  87. try:
  88. results.append({
  89. 'confidence': match['confidence'],
  90. 'timestamp': ts
  91. })
  92. print("{} {}".format(ts, match['confidence']))
  93. except KeyError as ex:
  94. print(ex, file=sys.stderr)
  95. ts += match['length'] / 1000
  96. # 3. Look for success or failure
  97. ts = 0
  98. found_counter = 0
  99. found_index = None
  100. segments_needed = 2
  101. seconds_needed = 9
  102. threshold = 20
  103. found = None
  104. for index, result in enumerate(results):
  105. if result['confidence'] > threshold:
  106. found_counter += 1
  107. if found_index is None:
  108. found_index = index
  109. else:
  110. found_counter = 0
  111. found_index = None
  112. if found_counter >= segments_needed:
  113. """ TODO: It will be neccessary to improve this
  114. further, so it can recognize more than one
  115. audio in the same segment of 1 hour. Also the
  116. seconds transcurred is important; a time
  117. difference is needed """
  118. found = results[found_index]
  119. break
  120. try:
  121. response = client.put_schedule_results(
  122. item['schedule'],
  123. item['id'],
  124. results,
  125. found=found['timestamp'] if found else None,
  126. )
  127. print(len(results))
  128. print(results)
  129. #print(response)
  130. except ConnectionError as ex:
  131. print(ex, file=sys.stderr)
  132. except UserWarning as warn:
  133. print(str(warn), file=sys.stderr)
  134. def iterate_audios(dt, station):
  135. """ Given a datetime object and an station,
  136. iterate a list of files that are between
  137. the the date and itself plus 5 minutes;
  138. station must match too """
  139. from_time = time.mktime(dt.timetuple())
  140. to_time = from_time + 3599
  141. print('from {} to {}'.format(from_time, to_time))
  142. cursor = db.cursor()
  143. cursor.execute((
  144. 'select "filename", "timestamp" '
  145. 'from "file" '
  146. 'where "timestamp" between ? and ? '
  147. 'and "station" = ? '
  148. 'order by "timestamp" asc'
  149. ),
  150. (from_time, to_time, station, ),
  151. )
  152. files = [file for file in cursor]
  153. cursor.close()
  154. for mp3 in files:
  155. mp3path, ts = mp3
  156. mp3name = os.path.basename(mp3path)
  157. yield (mp3path, mp3name, ts)
  158. def cloud_download(ad_key=None):
  159. """ Given an ad key, the file is downloaded to
  160. the system temporal folder to be processed """
  161. ad = fbdb.reference('ads/{}'.format(ad_key)).get()
  162. filename = os.path.basename(ad['path'])
  163. out_file = os.path.join(AUDIOS_PATH, filename)
  164. url = '{}/{}'.format(cloud_base_url, ad['path'])
  165. response = requests.get(url)
  166. if response.status_code == 200:
  167. with open(out_file, "wb") as fp:
  168. fp.write(response.content)
  169. return out_file
  170. app = Application()
  171. loop = IOLoop.current()
  172. loop.add_callback(feed_queue)
  173. if __name__ == '__main__':
  174. loop.start()