service.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572
  1. # -*- coding: utf8 -*-
  2. from __future__ import print_function, absolute_import
  3. from tornado.ioloop import IOLoop
  4. from tornado.web import Application
  5. from fourier.api.client import Client, ConnectionError
  6. from fourier.boxconfig import parse_config
  7. from fourier.dejavu.recognize import FilePerSecondRecognizer
  8. from datetime import datetime, timedelta
  9. from ondemand.endpoint import setup_endpoint
  10. from ondemand.calibration import Calibrations
  11. from fourier.dejavu import Dejavu, CouldntDecodeError
  12. from firebase_admin import credentials
  13. from firebase_admin import db as fbdb
  14. from binascii import hexlify
  15. from base64 import b64decode
  16. from threading import Thread
  17. from multiprocessing import Process
  18. from argparse import ArgumentParser
  19. from subprocess import Popen, PIPE
  20. import logging as log
  21. import firebase_admin
  22. import mutagen.mp3
  23. import OpenSSL.SSL
  24. import subprocess
  25. import requests
  26. import dateutil
  27. import hashlib
  28. import sqlite3
  29. import math
  30. import time
  31. import sys
  32. import os
  33. if sys.version_info >= (3, 0):
  34. from queue import Queue, Empty
  35. else:
  36. from Queue import Queue, Empty
  37. log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
  38. AUDIOS_PATH = '/tmp'
  39. AHEAD_TIME_AUDIO_TOLERANCE = 2 # second
  40. MAX_SEGMENT_THREADS = 4
  41. THRESHOLD = 10
  42. SEGMENTS_TOLERANCE_RATE = 0.6
  43. FALL_TOLERANCE_SEGMENTS = 1
  44. # THRESHOLD
  45. THRESHOLD_FIXED = 1
  46. THRESHOLD_AVERAGE = 2
  47. # Modos de procesamiento de queue
  48. # - QUEQUE_SINGLE: procesa solo un segmento a la vez
  49. # - QUEUE_THREAD: inicia un hilo para cada segmento
  50. # Por default se usará el threaded.
  51. # TODO: hacerlo configurable por medio de argumentos
  52. # de ejecución.
  53. QUEUE_SINGLE = 1
  54. QUEUE_THREAD = 2
  55. # Se pueden usar diferentes API'se
  56. # la de threading y la de multiprocessing.
  57. MultiAPI = Process
  58. config = parse_config()
  59. queue = Queue()
  60. client = Client(config['device_id'],
  61. config['apiSecret'])
  62. cloud_base_url = 'https://storage.googleapis.com/{}' \
  63. .format(config['bucket'])
  64. base_path = config.get("basepath", "/var/fourier")
  65. fb_credentials = credentials.Certificate('/etc/Fourier-key.json')
  66. firebase_admin.initialize_app(fb_credentials, config['firebase'])
  67. device_id = config['device_id']
  68. device_path = os.path.join(base_path, device_id)
  69. recognizer = FilePerSecondRecognizer
  70. device_ref = fbdb.reference('devices').child(config['device_id'])
  71. calibrations = Calibrations(config['device_id'], client=client)
  72. # settings
  73. queue_mode = QUEUE_SINGLE
  74. threshold_mode = THRESHOLD_FIXED
  75. db_path = config.get('localDatabase', os.path.join(device_path, 'files.db'))
  76. db = sqlite3.connect(db_path)
  77. cloud_cache = {}
  78. def feed_queue():
  79. """ Search for pending scheduled work in
  80. server and add them to a memory queue. """
  81. try:
  82. response = get_pendings()
  83. # response = client.get_schedule_pending()
  84. # downloaded_counter = len(response['items'])
  85. # for item in response['items']:
  86. if len(response["elementos"]) > 0:
  87. queue.put(response)
  88. if queue.qsize() > 0:
  89. if queue_mode == QUEUE_THREAD:
  90. loop.add_callback(process_queue_with_threads)
  91. else:
  92. loop.add_callback(process_queue)
  93. else:
  94. loop.add_timeout(time.time() + 30, feed_queue)
  95. except ConnectionError as ex:
  96. log.error('[feed_queue] cannot feed: {}, retryig later'.format(ex))
  97. loop.add_timeout(time.time() + 15, feed_queue)
  98. except Exception as ex:
  99. """ Errores desconocidos """
  100. log.error('[feed_queue] {}'.format(ex))
  101. loop.add_timeout(time.time() + 60, feed_queue)
  102. raise ex
  103. def process_queue():
  104. """ Try to the next item in a queue and start
  105. processing it accordingly. If success, repeat
  106. the function or go to feed if no more items. """
  107. try:
  108. item = queue.get(False)
  109. process_segment(item)
  110. loop.add_callback(process_queue)
  111. except Empty:
  112. loop.add_callback(feed_queue)
  113. except Exception as ex:
  114. log.error(ex)
  115. loop.add_callback(process_queue)
  116. def process_queue_with_threads():
  117. threads = [None] * MAX_SEGMENT_THREADS
  118. is_drained = False
  119. log.info('Starting thread processing')
  120. while True:
  121. for index, t in enumerate(threads):
  122. if not t:
  123. try:
  124. item = queue.get(False)
  125. station = item['station']
  126. date = dateutil.parser.parse(item['date'], ignoretz=True)
  127. calibration = calibrations.get(station)
  128. audios = [f for f in iterate_audios(
  129. date, station,
  130. calibration=calibration
  131. )]
  132. thread = MultiAPI(target=process_segment,
  133. args=(item,),
  134. kwargs={
  135. 'audios': audios,
  136. 'calibration': calibration,
  137. }
  138. )
  139. threads[index] = thread
  140. thread.start()
  141. except Empty:
  142. is_drained = True
  143. except Exception as err:
  144. log.error('[process_queue_with_threads] [{}] {}'.format(
  145. station,
  146. err,
  147. ))
  148. continue
  149. elif not t.is_alive():
  150. threads[index] = None
  151. if is_drained:
  152. if threads.count(None) == MAX_SEGMENT_THREADS:
  153. break
  154. log.info('Finished thread processing')
  155. loop.add_callback(feed_queue)
  156. def process_segment(item, audios=None, calibration=None):
  157. """ Procesa una hora de audio """
  158. station = item['estacion']
  159. if not calibration:
  160. calibration = calibrations.get(station)
  161. tolerance = calibration['tolerance']
  162. date = dateutil.parser.parse(item['fecha'], ignoretz=True)
  163. segment_size = calibration['segmentSize']
  164. audio_length = 0
  165. log.info('[process_segment] (th: {}, tl: {}, ft: {}, ss: {}, ho: {}) {}' \
  166. .format(
  167. calibration['threshold'],
  168. calibration['tolerance'],
  169. calibration['fallTolerance'],
  170. calibration['segmentSize'],
  171. calibration['hourlyOffset'],
  172. item,
  173. )
  174. )
  175. # 1. obtener el audio desde firebase
  176. # y calcular su fingerprint.
  177. try:
  178. filenames = []
  179. id_by_ad = {}
  180. item_ids = []
  181. x = 0
  182. for i in item["elementos"]:
  183. x = x + 1
  184. log.info('[process_segment] downloading ad {} {}'.format(x, i["anuncio"]))
  185. id_by_ad[i['anuncio']] = i['id']
  186. if i['id'] not in item_ids:
  187. item_ids.append(i['id'])
  188. filename, md5hash = cloud_download(ad_key=i["anuncio"])
  189. if filename:
  190. filenames.append((filename, md5hash))
  191. else:
  192. log.info('[process_segment] ad file missing')
  193. except Exception as err:
  194. log.error('[process_segment] [{}] {}'.format(station, err))
  195. return
  196. # 1.1 Calcular el número de segmentos requeridos
  197. # de acuerdo a la duración total del audio.
  198. try:
  199. filename, md5hash = filenames[0]
  200. audio = mutagen.mp3.MP3(filename)
  201. audio_length = audio.info.length
  202. if segment_size == 'integer':
  203. segment_size = int(audio_length)
  204. elif segment_size == 'ceil':
  205. segment_size = int(math.ceil(audio_length / 5)) * 5
  206. segments_needed = int(round(float(audio_length) / float(segment_size)))
  207. segments_needed = int(round(segments_needed * tolerance))
  208. except Exception as ex:
  209. log.error('[process_segment] file {} is not an mp3'.format(filename))
  210. log.error(str(ex))
  211. return
  212. dejavu = Dejavu({"database_type": "mem"})
  213. try:
  214. x = 0
  215. for i in filenames:
  216. filename = i[0]
  217. dejavu.fingerprint_file(filename)
  218. except Exception as ex:
  219. log.error('[process_segment] cannot fingerprint: {}'.format(ex))
  220. """ Hay dos posibles escensarios al obtener los audios
  221. a. Los audios vienen por el parámetro "audios" de la
  222. función, siendo esta una lista.
  223. b. Los audios se obtienen directamente de la base
  224. de datos en modo de cursor.
  225. """
  226. try:
  227. audios_iterable = audios if audios \
  228. else iterate_audios(date, station, calibration=calibration)
  229. except sqlite3.OperationalError as err:
  230. log.error('[process_segment] [{}] {}'.format(station, err))
  231. return
  232. # 2. Read the list of files from local database
  233. audios_counter = 0
  234. results = []
  235. v = []
  236. for path, name, ts in audios_iterable:
  237. # short_path = os.path.join(station, name)
  238. audios_counter += os.path.isfile(path)
  239. values = []
  240. if not os.path.isfile(path):
  241. download_file(path)
  242. try:
  243. for match in dejavu.recognize(recognizer, path, segment_size):
  244. name = None
  245. ad = None
  246. try:
  247. ad = match['name']
  248. if match['name'] in id_by_ad.keys():
  249. name = id_by_ad[match['name']]
  250. else:
  251. name = match['name']
  252. except KeyError:
  253. pass
  254. results.append({
  255. 'ad': ad,
  256. 'confidence': match['confidence'],
  257. 'timestamp': ts,
  258. 'offset': match['offset'],
  259. 'name': name
  260. })
  261. values.append(str(match['confidence']))
  262. ts += match['length'] / 1000
  263. v.append(','.join(values))
  264. log.info('[process_segment] [{2}] {0}) {1}'.format(
  265. os.path.split(path)[-1],
  266. ','.join(values),
  267. station,
  268. ))
  269. except CouldntDecodeError as ex:
  270. log.error('[process_segment] {}'.format(ex))
  271. try:
  272. encontrados = {}
  273. for i in item_ids:
  274. r = [result for result in results if result["name"] == i]
  275. encontrados[i] = find_repetitions(r, segments_needed=segments_needed, calibration=calibration,)
  276. for id in encontrados:
  277. for e in encontrados[id]:
  278. for i in item['elementos']:
  279. if i['id'] == id and i['anuncio'] == e['ad']:
  280. if 'encontrados' not in i:
  281. i['encontrados'] = []
  282. i['encontrados'].append(e)
  283. break
  284. item["archivos_perdidos"] = (12 - audios_counter) if audios_counter < 12 else 0
  285. response = send_results(item)
  286. log.info('[{}] API response: {}'.format(station, response))
  287. except ConnectionError as ex:
  288. log.error('[process_segment] {}'.format(str(ex)))
  289. except UserWarning as warn:
  290. log.warning(str(warn))
  291. def find_repetitions(results, segments_needed=2, calibration=None):
  292. found_counter = 0
  293. found_down_counter = 0
  294. found_index = None
  295. expect_space = False
  296. expect_recover = False
  297. last_value_in_threshold_index = -1
  298. fall_tolerance = calibration['fallTolerance']
  299. found = []
  300. high = 100 # Obtener este valor desde un parámetro
  301. middle_high = 50 # Obtener este valor desde un parámetro
  302. segment_middle_needed = 2 # Obtener este valor desde un parámetro
  303. found_high = None
  304. found_middle_high = []
  305. if threshold_mode == THRESHOLD_FIXED:
  306. threshold = calibration['threshold']
  307. elif threshold_mode == THRESHOLD_AVERAGE:
  308. values = [x['confidence'] for x in results]
  309. threshold = math.ceil(float(sum(values)) / float(len(values)))
  310. if segments_needed < 1:
  311. segments_needed = 1
  312. for index, result in enumerate(results):
  313. #if result['confidence'] >= high:
  314. # if found_high is None:
  315. # found_high = index
  316. # elif result['confidence'] > results[found_high]['confidence']:
  317. # found_high = index
  318. #elif result['confidence'] >= middle_high:
  319. # found_middle_high.append(index)
  320. if not expect_space:
  321. if result['confidence'] >= threshold:
  322. found_counter += 1
  323. last_value_in_threshold_index = index
  324. if found_index is None:
  325. found_index = index
  326. if expect_recover:
  327. found_counter += found_down_counter
  328. expect_recover = False
  329. elif fall_tolerance:
  330. if not expect_recover:
  331. if last_value_in_threshold_index != -1:
  332. """ Solo cuando ya haya entrado por lo menos
  333. un valor en el rango del threshold, es cuando
  334. se podrá esperar un valor bajo """
  335. expect_recover = True
  336. found_down_counter += 1
  337. else:
  338. pass
  339. else:
  340. """ Si después de haber pasado tolerado 1 elemento
  341. vuelve a salir otro fuera del threshold continuo,
  342. entonces ya se da por perdido """
  343. found_counter = 0
  344. found_down_counter = 0
  345. found_index = None
  346. expect_recover = False
  347. else:
  348. found_counter = 0
  349. found_down_counter = 0
  350. found_index = None
  351. expect_recover = False
  352. # Aquí veremos si hay un valor alto
  353. #if found_high is not None:
  354. # found_row = results[found_high]
  355. # found.append(found_row)
  356. #elif len(found_middle_high) >= segment_middle_needed:
  357. # found_row = results[found_middle_high[0]]
  358. # found.append(found_row)
  359. #found_high = None
  360. #found_middle_high = []
  361. else:
  362. if result['confidence'] <= threshold:
  363. expect_space = False
  364. if found_counter >= segments_needed:
  365. found_row = results[found_index]
  366. found.append(found_row)
  367. found_counter = 0
  368. expect_space = True
  369. #found_high = None
  370. #found_middle_high = []
  371. return found
  372. def iterate_audios(dt, station, calibration=None):
  373. """ Given a datetime object and an station,
  374. iterate a list of files that are between
  375. the the date and itself plus 5 minutes;
  376. station must match too """
  377. tm = time.mktime(dt.timetuple())
  378. if calibration and calibration['hourlyOffset']:
  379. hoffset = calibration['hourlyOffset']
  380. from_time = tm + hoffset
  381. to_time = tm + 3599 + hoffset
  382. elif AHEAD_TIME_AUDIO_TOLERANCE:
  383. """ Conventional mode """
  384. from_time = tm + AHEAD_TIME_AUDIO_TOLERANCE
  385. to_time = from_time + 3599 + AHEAD_TIME_AUDIO_TOLERANCE
  386. log.info('from {} to {}'.format(int(from_time), int(to_time)))
  387. cursor = db.cursor()
  388. cursor.execute((
  389. 'select "filename", "timestamp" '
  390. 'from "file" '
  391. 'where "timestamp" between ? and ? '
  392. 'and "station" = ? '
  393. 'order by "timestamp" asc'
  394. ),
  395. (from_time, to_time, station,),
  396. )
  397. files = [file for file in cursor]
  398. cursor.close()
  399. for mp3 in files:
  400. mp3path, ts = mp3
  401. mp3name = os.path.basename(mp3path)
  402. yield (mp3path, mp3name, ts)
  403. def cloud_download(ad_key=None):
  404. """ Given an ad key, the file is downloaded to
  405. the system temporal folder to be processed """
  406. if ad_key in cloud_cache:
  407. """ If this file has already been downloaded,
  408. will not be downloaded again, instead will
  409. be taken from cloud_cache dictionary """
  410. filename, md5hash = cloud_cache[ad_key]
  411. if os.path.isfile(filename):
  412. return filename, md5hash
  413. ad = fbdb.reference('ads/{}'.format(ad_key)).get()
  414. filename = os.path.basename(ad['path'])
  415. out_file = os.path.join(AUDIOS_PATH, filename)
  416. url = '{}/{}'.format(cloud_base_url, ad['path'])
  417. if(os.path.isfile(out_file)):
  418. return out_file, md5_checksum(out_file)
  419. response = requests.get(url)
  420. if response.status_code == 200:
  421. hashes = response.headers['x-goog-hash']
  422. hashes = hashes.split(',')
  423. hashes = [h.split('=', 1) for h in hashes]
  424. hashes = {h[0].strip(): hexlify(b64decode(h[1])) for h in hashes}
  425. md5sum = hashes['md5']
  426. with open(out_file, "wb") as fp:
  427. fp.write(response.content)
  428. tp = (out_file, md5sum,)
  429. p = Popen(['ffprobe', '-v', 'error', '-select_streams', 'a:0', '-show_entries', 'stream=codec_name', '-of',
  430. 'default=nokey=1:noprint_wrappers=1', out_file], stdin=PIPE, stdout=PIPE, stderr=PIPE)
  431. rc = p.returncode
  432. if rc != 'mp3\n':
  433. subprocess.call(['mv', out_file, out_file + '.old'])
  434. subprocess.call(
  435. ['ffmpeg', '-hide_banner', '-loglevel', 'panic', '-i', out_file + '.old', '-codec:a', 'libmp3lame',
  436. '-qscale:a', '2', '-f', 'mp3', out_file])
  437. subprocess.call(['rm', '-rf', out_file + '.old'])
  438. cloud_cache[ad_key] = tp
  439. return tp
  440. def download_file(file_path=None):
  441. file_path_cloud = file_path.replace("/var/fourier/", "")
  442. url = '{}/{}'.format(cloud_base_url, file_path_cloud)
  443. response = requests.get(url)
  444. if response.status_code == 200:
  445. with open(file_path, "wb") as fp:
  446. fp.write(response.content)
  447. cursor = db.cursor()
  448. cursor.execute('update "file" set uploaded = 0 where filename = ?', (file_path,), )
  449. cursor.close()
  450. def get_pendings():
  451. url = 'https://api.fourier.audio/v1/calendario/pendiente?id={}'.format(config['device_id'], )
  452. headers = {
  453. 'Authorization': 'Bearer {}'.format(config['apiSecret'], )
  454. }
  455. response = requests.get(url, headers=headers)
  456. return response.json()
  457. def send_results(item):
  458. url = 'https://api.fourier.audio/v1/calendario/resultado'
  459. headers = {
  460. 'Authorization': 'Bearer {}'.format(config['apiSecret'], )
  461. }
  462. log.info('url: {}'.format(url))
  463. response = requests.post(url, json=item, headers=headers)
  464. return response
  465. def md5_checksum(filename):
  466. hash_md5 = hashlib.md5()
  467. with open(filename, "rb") as f:
  468. for chunk in iter(lambda: f.read(4096), b""):
  469. hash_md5.update(chunk)
  470. return hash_md5.hexdigest()
  471. app = setup_endpoint(queue=queue)
  472. loop = IOLoop.current()
  473. loop.add_callback(feed_queue)
  474. if __name__ == '__main__':
  475. try:
  476. log.info('Starting ondemand service')
  477. loop.start()
  478. except KeyboardInterrupt:
  479. log.error('Process killed')