service.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. # -*- coding: utf8 -*-
  2. from __future__ import print_function
  3. from tornado.ioloop import IOLoop
  4. from tornado.web import Application
  5. from fourier.api.client import Client, ConnectionError
  6. from fourier.boxconfig import parse_config
  7. from fourier.dejavu.recognize import FilePerSecondRecognizer
  8. from datetime import datetime, timedelta
  9. from fourier.dejavu import Dejavu
  10. from firebase import firebase
  11. from Queue import Queue, Empty
  12. from firebase_admin import credentials
  13. from firebase_admin import db as fbdb
  14. import firebase_admin
  15. import requests
  16. import dateutil
  17. import sqlite3
  18. import time
  19. import sys
  20. import os
  21. AUDIOS_PATH = '/tmp'
  22. config = parse_config()
  23. queue = Queue()
  24. client = Client(config['device_id'],
  25. #config['apiSecret'],
  26. '7002a5a7592f624aea1364d64a11e39f',
  27. base_url='http://localhost:41222')
  28. cloud_base_url = 'https://storage.googleapis.com/{}'\
  29. .format(config['bucket'])
  30. base_path = config.get("basepath", "/var/fourier")
  31. fb_credentials = credentials.Certificate('/etc/Fourier-key.json')
  32. firebase_admin.initialize_app(fb_credentials, config['firebase'])
  33. dejavu = Dejavu({"database_type":"mem"})
  34. device_id = config['device_id']
  35. device_path = os.path.join(base_path, device_id)
  36. recognizer = FilePerSecondRecognizer
  37. db_path = os.path.join(device_path, 'files.db')
  38. db = sqlite3.connect(db_path)
  39. def feed_queue():
  40. try:
  41. response = client.get_schedule_pending()
  42. for item in response['items']:
  43. queue.put(item)
  44. if queue.qsize() > 0:
  45. loop.add_callback(process_queue)
  46. else:
  47. loop.add_timeout(time.time() + 30, feed_queue)
  48. except ConnectionError as ex:
  49. print('cannot feed: {}, retryig later'.format(ex),
  50. file=sys.stderr)
  51. loop.add_timeout(time.time() + 15, feed_queue)
  52. except Exception as ex:
  53. """ Errores desconocidos """
  54. print(ex, file=sys.stderr)
  55. loop.add_timeout(time.time() + 60, feed_queue)
  56. def process_queue():
  57. try:
  58. item = queue.get(False)
  59. process_segment(item)
  60. loop.add_callback(process_queue)
  61. except Empty:
  62. loop.add_callback(feed_queue)
  63. def process_segment(item):
  64. """ Procesa una hora de audio """
  65. station = item['station']
  66. date = dateutil.parser.parse(item['date'])
  67. # 1. obtener el audio desde firebase
  68. # y calcular su fingerprint.
  69. filename = cloud_download(ad_key=item['ad'])
  70. if not filename:
  71. print('ad file missing', file=sys.stderr)
  72. return
  73. dejavu.fingerprint_file(filename)
  74. # 2. leer la lista de archivos de la base
  75. # de datos local de la caja.
  76. for path, name, ts in iterate_audios(date, station):
  77. matches = []
  78. for match in dejavu.recognize(recognizer, path, 5):
  79. matches.append({
  80. 'confidence': match['confidence'],
  81. 'timestamp': ts
  82. })
  83. print("{} {} {}".format(
  84. match['confidence'],
  85. ts,
  86. match['name'],
  87. ))
  88. ts += float(match["length"]) / 1000.0
  89. try:
  90. client.put_schedule_results(
  91. item['schedule'],
  92. item['id'],
  93. matches
  94. )
  95. except ConnectionError as ex:
  96. print(ex, file=sys.stderr)
  97. def iterate_audios(dt, station):
  98. from_time = time.mktime(dt.timetuple())
  99. to_time = from_time + 5 * 60
  100. cursor = db.cursor()
  101. cursor.execute((
  102. 'select "filename" '
  103. 'from "file" '
  104. 'where "timestamp" between ? and ? '
  105. 'and "station" = ? '
  106. 'order by "timestamp" asc'
  107. ),
  108. (from_time, to_time, station, ),
  109. )
  110. files = []
  111. for file in cursor:
  112. files.append(file[0])
  113. for mp3path in files:
  114. mp3name = os.path.basename(mp3path)
  115. mp3date = parse_date_filename(mp3_name)
  116. if mp3date:
  117. mp3ts = int(mp3date.strftime("%s"))
  118. yield (mp3path, mp3name, mp3ts)
  119. def cloud_download(ad_key=None):
  120. ad = fbdb.reference('ads/{}'.format(ad_key)).get()
  121. filename = os.path.basename(ad['path'])
  122. out_file = os.path.join(AUDIOS_PATH, filename)
  123. url = '{}/{}'.format(cloud_base_url, ad['path'])
  124. response = requests.get(url)
  125. if response.status_code == 200:
  126. with open(out_file, "wb") as fp:
  127. fp.write(response.content)
  128. return out_file
  129. else:
  130. print('no')
  131. return None
  132. def parse_date_filename(val):
  133. try:
  134. return strptime(val[:19], '%Y-%m-%dT%H-%M-%S')
  135. except ValueError:
  136. return strptime(val[:19], '%Y-%m-%d-%H-%M-%S')
  137. app = Application()
  138. loop = IOLoop.current()
  139. loop.add_callback(feed_queue)
  140. if __name__ == '__main__':
  141. loop.start()