service.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. # -*- coding: utf8 -*-
  2. from __future__ import print_function, absolute_import
  3. from tornado.ioloop import IOLoop
  4. from tornado.web import Application
  5. from fourier.api.client import Client, ConnectionError
  6. from fourier.boxconfig import parse_config
  7. from fourier.dejavu.recognize import FilePerSecondRecognizer
  8. from datetime import datetime, timedelta
  9. from ondemand.endpoint import setup_endpoint
  10. from ondemand.calibration import Calibrations
  11. from fourier.dejavu import Dejavu, CouldntDecodeError
  12. from firebase_admin import credentials
  13. from firebase_admin import db as fbdb
  14. from binascii import hexlify
  15. from base64 import b64decode
  16. from threading import Thread
  17. from multiprocessing import Process
  18. from argparse import ArgumentParser
  19. import logging as log
  20. import firebase_admin
  21. import mutagen.mp3
  22. import OpenSSL.SSL
  23. import requests
  24. import dateutil
  25. import sqlite3
  26. import math
  27. import time
  28. import sys
  29. import os
  30. if sys.version_info >= (3, 0):
  31. from queue import Queue, Empty
  32. else:
  33. from Queue import Queue, Empty
  34. log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
  35. AUDIOS_PATH = '/tmp'
  36. AHEAD_TIME_AUDIO_TOLERANCE = 2 # second
  37. MAX_SEGMENT_THREADS = 4
  38. THRESHOLD = 10
  39. SEGMENTS_TOLERANCE_RATE = 0.6
  40. FALL_TOLERANCE_SEGMENTS = 1
  41. # THRESHOLD
  42. THRESHOLD_FIXED = 1
  43. THRESHOLD_AVERAGE = 2
  44. # Modos de procesamiento de queue
  45. #  - QUEQUE_SINGLE: procesa solo un segmento a la vez
  46. # - QUEUE_THREAD: inicia un hilo para cada segmento
  47. # Por default se usará el threaded.
  48. # TOOD: hacerlo configurable por medio de argumentos
  49. # de ejecución.
  50. QUEUE_SINGLE = 1
  51. QUEUE_THREAD = 2
  52. # Se pueden usar diferentes API's
  53. # la de threading y la de multiprocessing.
  54. MultiAPI = Process
  55. config = parse_config()
  56. queue = Queue()
  57. client = Client(config['device_id'],
  58. config['apiSecret'])
  59. cloud_base_url = 'https://storage.googleapis.com/{}'\
  60. .format(config['bucket'])
  61. base_path = config.get("basepath", "/var/fourier")
  62. fb_credentials = credentials.Certificate('/etc/Fourier-key.json')
  63. firebase_admin.initialize_app(fb_credentials, config['firebase'])
  64. dejavu = Dejavu({"database_type":"mem"})
  65. device_id = config['device_id']
  66. device_path = os.path.join(base_path, device_id)
  67. recognizer = FilePerSecondRecognizer
  68. device_ref = fbdb.reference('devices').child(config['device_id'])
  69. calibrations = Calibrations(config['device_id'], client=client)
  70. # settings
  71. queue_mode = QUEUE_THREAD
  72. threshold_mode = THRESHOLD_FIXED
  73. db_path = config.get('localDatabase', os.path.join(device_path, 'files.db'))
  74. db = sqlite3.connect(db_path)
  75. cloud_cache = {}
  76. def feed_queue():
  77. """ Search for pending scheduled work in
  78. server and add them to a memory queue. """
  79. try:
  80. response = client.get_schedule_pending()
  81. downloaded_counter = len(response['items'])
  82. for item in response['items']:
  83. queue.put(item)
  84. if downloaded_counter:
  85. log.info(('[feed_queue] {} new '
  86. + 'pending schedule items.')\
  87. .format(downloaded_counter)
  88. )
  89. if queue.qsize() > 0:
  90. if queue_mode == QUEUE_THREAD:
  91. loop.add_callback(process_queue_with_threads)
  92. else:
  93. loop.add_callback(process_queue)
  94. else:
  95. loop.add_timeout(time.time() + 30, feed_queue)
  96. except ConnectionError as ex:
  97. log.error('[feed_queue] cannot feed: {}, retryig later'.format(ex))
  98. loop.add_timeout(time.time() + 15, feed_queue)
  99. except Exception as ex:
  100. """ Errores desconocidos """
  101. log.error('[feed_queue] {}'.format(ex))
  102. loop.add_timeout(time.time() + 60, feed_queue)
  103. raise ex
  104. def process_queue():
  105. """ Try to the next item in a queue and start
  106. processing it accordingly. If success, repeat
  107. the function or go to feed if no more items. """
  108. try:
  109. item = queue.get(False)
  110. process_segment(item)
  111. loop.add_callback(process_queue)
  112. except Empty:
  113. loop.add_callback(feed_queue)
  114. except Exception as ex:
  115. log.error(ex)
  116. loop.add_callback(process_queue)
  117. def process_queue_with_threads():
  118. threads = [None] * MAX_SEGMENT_THREADS
  119. is_drained = False
  120. log.info('Starting thread processing')
  121. while True:
  122. for index, t in enumerate(threads):
  123. if not t:
  124. try:
  125. item = queue.get(False)
  126. station = item['station']
  127. date = dateutil.parser.parse(item['date'])
  128. calibration = calibrations.get(station)
  129. audios = [f for f in iterate_audios(
  130. date, station,
  131. calibration=calibration
  132. )]
  133. thread = MultiAPI(target=process_segment,
  134. args=(item,),
  135. kwargs={
  136. 'audios': audios,
  137. 'calibration': calibration,
  138. }
  139. )
  140. threads[index] = thread
  141. thread.start()
  142. except Empty:
  143. is_drained = True
  144. except Exception as err:
  145. log.error('[process_queue_with_threads] [{}] {}'.format(
  146. station,
  147. err,
  148. ))
  149. continue
  150. elif not t.is_alive():
  151. threads[index] = None
  152. if is_drained:
  153. if threads.count(None) == MAX_SEGMENT_THREADS:
  154. break
  155. log.info('Finished thread processing')
  156. loop.add_callback(feed_queue)
  157. def process_segment(item, audios=None, calibration=None):
  158. """ Procesa una hora de audio """
  159. station = item['station']
  160. if not calibration:
  161. calibration = calibrations.get(station)
  162. tolerance = calibration['tolerance']
  163. date = dateutil.parser.parse(item['date'])
  164. segment_size = calibration['segmentSize']
  165. audio_length = 0
  166. log.info('[process_segment] (th: {}, tl: {}, ft: {}, ss: {}, ho: {}) {}'\
  167. .format(
  168. calibration['threshold'],
  169. calibration['tolerance'],
  170. calibration['fallTolerance'],
  171. calibration['segmentSize'],
  172. calibration['hourlyOffset'],
  173. item,
  174. )
  175. )
  176. # 1. obtener el audio desde firebase
  177. # y calcular su fingerprint.
  178. try:
  179. filename, md5hash = cloud_download(ad_key=item['ad'])
  180. if not filename:
  181. log.info('[process_segment] ad file missing')
  182. return
  183. except Exception as err:
  184. log.error('[process_segment] [{}] {}'.format(station, err))
  185. return
  186. # 1.1 Calcular el número de segmentos requeridos
  187. # de acuerdo a la duración total del audio.
  188. try:
  189. audio = mutagen.mp3.MP3(filename)
  190. audio_length = audio.info.length
  191. if segment_size == 'integer':
  192. segment_size = int(audio_length)
  193. elif segment_size == 'ceil':
  194. segment_size = int(math.ceil(audio_length / 5)) * 5
  195. segments_needed = int(round(float(audio_length) / float(segment_size)))
  196. segments_needed = int(round(segments_needed * tolerance))
  197. except Exception as ex:
  198. log.error('[process_segment] file {} is not an mp3'.format(filename))
  199. log.error(str(ex))
  200. return
  201. try:
  202. dejavu.fingerprint_file(filename)
  203. except Exception as ex:
  204. log.error('[process_segment] cannot fingerprint: {}'.format(ex))
  205. """ Hay dos posibles escensarios al obtener los audios
  206. a. Los audios vienen por el parámetro "audios" de la
  207. función, siendo esta una lista.
  208. b. Los audios se obtienen directamente de la base
  209. de datos en modo de cursor.
  210. """
  211. try:
  212. audios_iterable = audios if audios \
  213. else iterate_audios(date, station, calibration=calibration)
  214. except sqlite3.OperationalError as err:
  215. log.error('[process_segment] [{}] {}'.format(station, err))
  216. return
  217. # 2. Read the list of files from local database
  218. audios_counter = 0
  219. results = []
  220. for path, name, ts in audios_iterable:
  221. short_path = os.path.join(station, name)
  222. audios_counter += os.path.isfile(path)
  223. values = []
  224. if not os.path.isfile(path):
  225. log.error('[process_segment] file not found: {}'\
  226. .format(short_path))
  227. continue
  228. try:
  229. for match in dejavu.recognize(recognizer, path, segment_size,
  230. ads_filter=[md5hash]):
  231. results.append({
  232. 'confidence': match['confidence'],
  233. 'timestamp': ts,
  234. 'offset': match['offset']
  235. })
  236. values.append(str(match['confidence']))
  237. ts += match['length'] / 1000
  238. log.info('[process_segment] [{3}] {2} {0}) {1}'.format(
  239. os.path.split(path)[-1],
  240. ','.join(values),
  241. item['ad'],
  242. station,
  243. ))
  244. except CouldntDecodeError as ex:
  245. log.error('[process_segment] {}'.format(ex))
  246. try:
  247. response = client.put_schedule_results(
  248. item['schedule'],
  249. item['id'],
  250. None, # TODO: send results again
  251. found=find_repetitions(results,
  252. segments_needed=segments_needed,
  253. calibration=calibration,
  254. ),
  255. missing_files=(12 - audios_counter) \
  256. if audios_counter < 12 else 0
  257. )
  258. log.info('[{}] API response: {}'.format(station, response))
  259. except ConnectionError as ex:
  260. log.error('[process_segment] {}'.format(str(ex)))
  261. except UserWarning as warn:
  262. log.warning(str(warn))
  263. def find_repetitions(results, segments_needed=2, calibration=None):
  264. found_counter = 0
  265. found_down_counter = 0
  266. found_index = None
  267. expect_space = False
  268. expect_recover = False
  269. last_value_in_threshold_index = -1
  270. fall_tolerance = calibration['fallTolerance']
  271. found = []
  272. if threshold_mode == THRESHOLD_FIXED:
  273. threshold = calibration['threshold']
  274. elif threshold_mode == THRESHOLD_AVERAGE:
  275. values = [x['confidence'] for x in results]
  276. threshold = math.ceil(float(sum(values)) / float(len(values)))
  277. if segments_needed < 1:
  278. segments_needed = 1
  279. for index, result in enumerate(results):
  280. if not expect_space:
  281. if result['confidence'] >= threshold:
  282. found_counter += 1
  283. last_value_in_threshold_index = index
  284. if found_index is None:
  285. found_index = index
  286. if expect_recover:
  287. found_counter += found_down_counter
  288. expect_recover = False
  289. elif fall_tolerance:
  290. if not expect_recover:
  291. if last_value_in_threshold_index != -1:
  292. """ Solo cuando ya haya entrado por lo menos
  293. un valor en el rango del threshold, es cuando
  294. se podrá esperar un valor bajo """
  295. expect_recover = True
  296. found_down_counter += 1
  297. else:
  298. pass
  299. else:
  300. """ Si después de haber pasado tolerado 1 elemento
  301. vuelve a salir otro fuera del threshold continuo,
  302. entonces ya se da por perdido """
  303. found_counter = 0
  304. found_down_counter = 0
  305. found_index = None
  306. expect_recover = False
  307. else:
  308. found_counter = 0
  309. found_down_counter = 0
  310. found_index = None
  311. expect_recover = False
  312. else:
  313. if result['confidence'] <= threshold:
  314. expect_space = False
  315. if found_counter >= segments_needed:
  316. found.append(results[found_index]['timestamp'])
  317. found_counter = 0
  318. expect_space = True
  319. return found
  320. def iterate_audios(dt, station, calibration=None):
  321. """ Given a datetime object and an station,
  322. iterate a list of files that are between
  323. the the date and itself plus 5 minutes;
  324. station must match too """
  325. tm = time.mktime(dt.timetuple())
  326. if calibration and calibration['hourlyOffset']:
  327. hoffset = calibration['hourlyOffset']
  328. from_time = tm + hoffset
  329. to_time = tm + 3599 + hoffset
  330. elif AHEAD_TIME_AUDIO_TOLERANCE:
  331. """ Conventional mode """
  332. from_time = tm + AHEAD_TIME_AUDIO_TOLERANCE
  333. to_time = from_time + 3599 + AHEAD_TIME_AUDIO_TOLERANCE
  334. log.info('from {} to {}'.format(int(from_time), int(to_time)))
  335. cursor = db.cursor()
  336. cursor.execute((
  337. 'select "filename", "timestamp" '
  338. 'from "file" '
  339. 'where "timestamp" between ? and ? '
  340. 'and "station" = ? '
  341. 'order by "timestamp" asc'
  342. ),
  343. (from_time, to_time, station, ),
  344. )
  345. files = [file for file in cursor]
  346. cursor.close()
  347. for mp3 in files:
  348. mp3path, ts = mp3
  349. mp3name = os.path.basename(mp3path)
  350. yield (mp3path, mp3name, ts)
  351. def cloud_download(ad_key=None):
  352. """ Given an ad key, the file is downloaded to
  353. the system temporal folder to be processed """
  354. if ad_key in cloud_cache:
  355. """ If this file has already been downloaded,
  356. will not be downloaded again, instead will
  357. be taken from cloud_cache dictionary """
  358. filename, md5hash = cloud_cache[ad_key]
  359. if os.path.isfile(filename):
  360. return filename, md5hash
  361. ad = fbdb.reference('ads/{}'.format(ad_key)).get()
  362. filename = os.path.basename(ad['path'])
  363. out_file = os.path.join(AUDIOS_PATH, filename)
  364. url = '{}/{}'.format(cloud_base_url, ad['path'])
  365. response = requests.get(url)
  366. if response.status_code == 200:
  367. hashes = response.headers['x-goog-hash']
  368. hashes = hashes.split(',')
  369. hashes = [h.split('=', 1) for h in hashes]
  370. hashes = {h[0].strip(): hexlify(b64decode(h[1])) for h in hashes}
  371. md5sum = hashes['md5']
  372. with open(out_file, "wb") as fp:
  373. fp.write(response.content)
  374. tp = (out_file, md5sum,)
  375. cloud_cache[ad_key] = tp
  376. return tp
  377. app = setup_endpoint(queue=queue)
  378. loop = IOLoop.current()
  379. loop.add_callback(feed_queue)
  380. if __name__ == '__main__':
  381. try:
  382. log.info('Starting ondemand service')
  383. loop.start()
  384. except KeyboardInterrupt:
  385. log.error('Process killed')