|
- # -*- coding: utf8 -*-
- from __future__ import print_function, absolute_import
- from tornado.ioloop import IOLoop
- from tornado.web import Application
- from fourier.api.client import Client, ConnectionError
- from fourier.boxconfig import parse_config
- from fourier.dejavu.recognize import FilePerSecondRecognizer
- from datetime import datetime, timedelta
- from ondemand.endpoint import setup_endpoint
- from ondemand.calibration import Calibrations
- from fourier.dejavu import Dejavu, CouldntDecodeError
- from firebase_admin import credentials
- from firebase_admin import db as fbdb
- from binascii import hexlify
- from base64 import b64decode
- from threading import Thread
- from multiprocessing import Process
- from argparse import ArgumentParser
- from subprocess import Popen, PIPE
- import logging as log
- import firebase_admin
- import mutagen.mp3
- import OpenSSL.SSL
- import subprocess
- import requests
- import dateutil
- import hashlib
- import sqlite3
- import math
- import time
- import sys
- import os
- if sys.version_info >= (3, 0):
- from queue import Queue, Empty
- else:
- from Queue import Queue, Empty
- log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
- AUDIOS_PATH = '/tmp'
- AHEAD_TIME_AUDIO_TOLERANCE = 2 # second
- MAX_SEGMENT_THREADS = 4
- THRESHOLD = 10
- SEGMENTS_TOLERANCE_RATE = 0.6
- FALL_TOLERANCE_SEGMENTS = 1
- # THRESHOLD
- THRESHOLD_FIXED = 1
- THRESHOLD_AVERAGE = 2
- # Modos de procesamiento de queue
- # - QUEQUE_SINGLE: procesa solo un segmento a la vez
- # - QUEUE_THREAD: inicia un hilo para cada segmento
- # Por default se usará el threaded.
- # TODO: hacerlo configurable por medio de argumentos
- # de ejecución.
- QUEUE_SINGLE = 1
- QUEUE_THREAD = 2
- # Se pueden usar diferentes API'se
- # la de threading y la de multiprocessing.
- MultiAPI = Process
- config = parse_config()
- queue = Queue()
- client = Client(config['device_id'],
- config['apiSecret'])
- cloud_base_url = 'https://storage.googleapis.com/{}' \
- .format(config['bucket'])
- base_path = config.get("basepath", "/var/fourier")
- fb_credentials = credentials.Certificate('/etc/Fourier-key.json')
- firebase_admin.initialize_app(fb_credentials, config['firebase'])
- device_id = config['device_id']
- device_path = os.path.join(base_path, device_id)
- recognizer = FilePerSecondRecognizer
- device_ref = fbdb.reference('devices').child(config['device_id'])
- calibrations = Calibrations(config['device_id'], client=client)
- # settings
- queue_mode = QUEUE_SINGLE
- threshold_mode = THRESHOLD_FIXED
- db_path = config.get('localDatabase', os.path.join(device_path, 'files.db'))
- db = sqlite3.connect(db_path)
- cloud_cache = {}
- def feed_queue():
- """ Search for pending scheduled work in
- server and add them to a memory queue. """
- try:
- response = get_pendings()
- # response = client.get_schedule_pending()
- # downloaded_counter = len(response['items'])
- # for item in response['items']:
- if len(response["elementos"]) > 0:
- queue.put(response)
- if queue.qsize() > 0:
- if queue_mode == QUEUE_THREAD:
- loop.add_callback(process_queue_with_threads)
- else:
- loop.add_callback(process_queue)
- else:
- loop.add_timeout(time.time() + 30, feed_queue)
- except ConnectionError as ex:
- log.error('[feed_queue] cannot feed: {}, retryig later'.format(ex))
- loop.add_timeout(time.time() + 15, feed_queue)
- except Exception as ex:
- """ Errores desconocidos """
- log.error('[feed_queue] {}'.format(ex))
- loop.add_timeout(time.time() + 60, feed_queue)
- raise ex
- def process_queue():
- """ Try to the next item in a queue and start
- processing it accordingly. If success, repeat
- the function or go to feed if no more items. """
- try:
- item = queue.get(False)
- process_segment(item)
- loop.add_callback(process_queue)
- except Empty:
- loop.add_callback(feed_queue)
- except Exception as ex:
- log.error(ex)
- loop.add_callback(process_queue)
- def process_queue_with_threads():
- threads = [None] * MAX_SEGMENT_THREADS
- is_drained = False
- log.info('Starting thread processing')
- while True:
- for index, t in enumerate(threads):
- if not t:
- try:
- item = queue.get(False)
- station = item['station']
- date = dateutil.parser.parse(item['date'], ignoretz=True)
- calibration = calibrations.get(station)
- audios = [f for f in iterate_audios(
- date, station,
- calibration=calibration
- )]
- thread = MultiAPI(target=process_segment,
- args=(item,),
- kwargs={
- 'audios': audios,
- 'calibration': calibration,
- }
- )
- threads[index] = thread
- thread.start()
- except Empty:
- is_drained = True
- except Exception as err:
- log.error('[process_queue_with_threads] [{}] {}'.format(
- station,
- err,
- ))
- continue
- elif not t.is_alive():
- threads[index] = None
- if is_drained:
- if threads.count(None) == MAX_SEGMENT_THREADS:
- break
- log.info('Finished thread processing')
- loop.add_callback(feed_queue)
- def process_segment(item, audios=None, calibration=None):
- """ Procesa una hora de audio """
- station = item['estacion']
- if not calibration:
- calibration = calibrations.get(station)
- tolerance = calibration['tolerance']
- date = dateutil.parser.parse(item['fecha'], ignoretz=True)
- segment_size = calibration['segmentSize']
- audio_length = 0
- log.info('[process_segment] (th: {}, tl: {}, ft: {}, ss: {}, ho: {}) {}' \
- .format(
- calibration['threshold'],
- calibration['tolerance'],
- calibration['fallTolerance'],
- calibration['segmentSize'],
- calibration['hourlyOffset'],
- item,
- )
- )
- # 1. obtener el audio desde firebase
- # y calcular su fingerprint.
- try:
- filenames = []
- id_by_ad = {}
- item_ids = []
- x = 0
- for i in item["elementos"]:
- x = x + 1
- log.info('[process_segment] downloading ad {} {}'.format(x, i["anuncio"]))
- id_by_ad[i['anuncio']] = i['id']
- if i['id'] not in item_ids:
- item_ids.append(i['id'])
- filename, md5hash = cloud_download(ad_key=i["anuncio"])
- if filename:
- filenames.append((filename, md5hash))
- else:
- log.info('[process_segment] ad file missing')
- except Exception as err:
- log.error('[process_segment] [{}] {}'.format(station, err))
- return
- # 1.1 Calcular el número de segmentos requeridos
- # de acuerdo a la duración total del audio.
- try:
- filename, md5hash = filenames[0]
- audio = mutagen.mp3.MP3(filename)
- audio_length = audio.info.length
- if segment_size == 'integer':
- segment_size = int(audio_length)
- elif segment_size == 'ceil':
- segment_size = int(math.ceil(audio_length / 5)) * 5
- segments_needed = int(round(float(audio_length) / float(segment_size)))
- segments_needed = int(round(segments_needed * tolerance))
- except Exception as ex:
- log.error('[process_segment] file {} is not an mp3'.format(filename))
- log.error(str(ex))
- return
- dejavu = Dejavu({"database_type": "mem"})
- try:
- x = 0
- for i in filenames:
- filename = i[0]
- dejavu.fingerprint_file(filename)
- except Exception as ex:
- log.error('[process_segment] cannot fingerprint: {}'.format(ex))
- """ Hay dos posibles escensarios al obtener los audios
- a. Los audios vienen por el parámetro "audios" de la
- función, siendo esta una lista.
- b. Los audios se obtienen directamente de la base
- de datos en modo de cursor.
- """
- try:
- audios_iterable = audios if audios \
- else iterate_audios(date, station, calibration=calibration)
- except sqlite3.OperationalError as err:
- log.error('[process_segment] [{}] {}'.format(station, err))
- return
- # 2. Read the list of files from local database
- audios_counter = 0
- results = []
- v = []
- for path, name, ts in audios_iterable:
- # short_path = os.path.join(station, name)
- audios_counter += os.path.isfile(path)
- values = []
- if not os.path.isfile(path):
- download_file(path)
- try:
- for match in dejavu.recognize(recognizer, path, segment_size):
- name = None
- ad = None
- try:
- ad = match['name']
- if match['name'] in id_by_ad.keys():
- name = id_by_ad[match['name']]
- else:
- name = match['name']
- except KeyError:
- pass
- results.append({
- 'ad': ad,
- 'confidence': match['confidence'],
- 'timestamp': ts,
- 'offset': match['offset'],
- 'name': name
- })
- values.append(str(match['confidence']))
- ts += match['length'] / 1000
- v.append(','.join(values))
- log.info('[process_segment] [{2}] {0}) {1}'.format(
- os.path.split(path)[-1],
- ','.join(values),
- station,
- ))
- except CouldntDecodeError as ex:
- log.error('[process_segment] {}'.format(ex))
- try:
- encontrados = {}
- for i in item_ids:
- r = [result for result in results if result["name"] == i]
- encontrados[i] = find_repetitions(r, segments_needed=segments_needed, calibration=calibration,)
- for id in encontrados:
- for e in encontrados[id]:
- for i in item['elementos']:
- if i['id'] == id and i['anuncio'] == e['ad']:
- if 'encontrados' not in i:
- i['encontrados'] = []
- i['encontrados'].append(e)
- break
- item["archivos_perdidos"] = (12 - audios_counter) if audios_counter < 12 else 0
- response = send_results(item)
- log.info('[{}] API response: {}'.format(station, response))
- except ConnectionError as ex:
- log.error('[process_segment] {}'.format(str(ex)))
- except UserWarning as warn:
- log.warning(str(warn))
- def find_repetitions(results, segments_needed=2, calibration=None):
- found_counter = 0
- found_down_counter = 0
- found_index = None
- expect_space = False
- expect_recover = False
- last_value_in_threshold_index = -1
- fall_tolerance = calibration['fallTolerance']
- found = []
- high = 100 # Obtener este valor desde un parámetro
- middle_high = 50 # Obtener este valor desde un parámetro
- segment_middle_needed = 2 # Obtener este valor desde un parámetro
- found_high = None
- found_middle_high = []
- if threshold_mode == THRESHOLD_FIXED:
- threshold = calibration['threshold']
- elif threshold_mode == THRESHOLD_AVERAGE:
- values = [x['confidence'] for x in results]
- threshold = math.ceil(float(sum(values)) / float(len(values)))
- if segments_needed < 1:
- segments_needed = 1
- for index, result in enumerate(results):
- #if result['confidence'] >= high:
- # if found_high is None:
- # found_high = index
- # elif result['confidence'] > results[found_high]['confidence']:
- # found_high = index
- #elif result['confidence'] >= middle_high:
- # found_middle_high.append(index)
- if not expect_space:
- if result['confidence'] >= threshold:
- found_counter += 1
- last_value_in_threshold_index = index
- if found_index is None:
- found_index = index
- if expect_recover:
- found_counter += found_down_counter
- expect_recover = False
- elif fall_tolerance:
- if not expect_recover:
- if last_value_in_threshold_index != -1:
- """ Solo cuando ya haya entrado por lo menos
- un valor en el rango del threshold, es cuando
- se podrá esperar un valor bajo """
- expect_recover = True
- found_down_counter += 1
- else:
- pass
- else:
- """ Si después de haber pasado tolerado 1 elemento
- vuelve a salir otro fuera del threshold continuo,
- entonces ya se da por perdido """
- found_counter = 0
- found_down_counter = 0
- found_index = None
- expect_recover = False
- else:
- found_counter = 0
- found_down_counter = 0
- found_index = None
- expect_recover = False
- # Aquí veremos si hay un valor alto
- #if found_high is not None:
- # found_row = results[found_high]
- # found.append(found_row)
- #elif len(found_middle_high) >= segment_middle_needed:
- # found_row = results[found_middle_high[0]]
- # found.append(found_row)
- #found_high = None
- #found_middle_high = []
- else:
- if result['confidence'] <= threshold:
- expect_space = False
- if found_counter >= segments_needed:
- found_row = results[found_index]
- found.append(found_row)
- found_counter = 0
- expect_space = True
- #found_high = None
- #found_middle_high = []
- return found
- def iterate_audios(dt, station, calibration=None):
- """ Given a datetime object and an station,
- iterate a list of files that are between
- the the date and itself plus 5 minutes;
- station must match too """
- tm = time.mktime(dt.timetuple())
- if calibration and calibration['hourlyOffset']:
- hoffset = calibration['hourlyOffset']
- from_time = tm + hoffset
- to_time = tm + 3599 + hoffset
- elif AHEAD_TIME_AUDIO_TOLERANCE:
- """ Conventional mode """
- from_time = tm + AHEAD_TIME_AUDIO_TOLERANCE
- to_time = from_time + 3599 + AHEAD_TIME_AUDIO_TOLERANCE
- log.info('from {} to {}'.format(int(from_time), int(to_time)))
- cursor = db.cursor()
- cursor.execute((
- 'select "filename", "timestamp" '
- 'from "file" '
- 'where "timestamp" between ? and ? '
- 'and "station" = ? '
- 'order by "timestamp" asc'
- ),
- (from_time, to_time, station,),
- )
- files = [file for file in cursor]
- cursor.close()
- for mp3 in files:
- mp3path, ts = mp3
- mp3name = os.path.basename(mp3path)
- yield (mp3path, mp3name, ts)
- def cloud_download(ad_key=None):
- """ Given an ad key, the file is downloaded to
- the system temporal folder to be processed """
- if ad_key in cloud_cache:
- """ If this file has already been downloaded,
- will not be downloaded again, instead will
- be taken from cloud_cache dictionary """
- filename, md5hash = cloud_cache[ad_key]
- if os.path.isfile(filename):
- return filename, md5hash
- ad = fbdb.reference('ads/{}'.format(ad_key)).get()
- filename = os.path.basename(ad['path'])
- out_file = os.path.join(AUDIOS_PATH, filename)
- url = '{}/{}'.format(cloud_base_url, ad['path'])
- if(os.path.isfile(out_file)):
- return out_file, md5_checksum(out_file)
- response = requests.get(url)
- if response.status_code == 200:
- hashes = response.headers['x-goog-hash']
- hashes = hashes.split(',')
- hashes = [h.split('=', 1) for h in hashes]
- hashes = {h[0].strip(): hexlify(b64decode(h[1])) for h in hashes}
- md5sum = hashes['md5']
- with open(out_file, "wb") as fp:
- fp.write(response.content)
- tp = (out_file, md5sum,)
- p = Popen(['ffprobe', '-v', 'error', '-select_streams', 'a:0', '-show_entries', 'stream=codec_name', '-of',
- 'default=nokey=1:noprint_wrappers=1', out_file], stdin=PIPE, stdout=PIPE, stderr=PIPE)
- rc = p.returncode
- if rc != 'mp3\n':
- subprocess.call(['mv', out_file, out_file + '.old'])
- subprocess.call(
- ['ffmpeg', '-hide_banner', '-loglevel', 'panic', '-i', out_file + '.old', '-codec:a', 'libmp3lame',
- '-qscale:a', '2', '-f', 'mp3', out_file])
- subprocess.call(['rm', '-rf', out_file + '.old'])
- cloud_cache[ad_key] = tp
- return tp
- def download_file(file_path=None):
- file_path_cloud = file_path.replace("/var/fourier/", "")
- url = '{}/{}'.format(cloud_base_url, file_path_cloud)
- response = requests.get(url)
- if response.status_code == 200:
- with open(file_path, "wb") as fp:
- fp.write(response.content)
- cursor = db.cursor()
- cursor.execute('update "file" set uploaded = 0 where filename = ?', (file_path,), )
- cursor.close()
- def get_pendings():
- url = 'https://api.fourier.audio/v1/calendario/pendiente?id={}'.format(config['device_id'], )
- headers = {
- 'Authorization': 'Bearer {}'.format(config['apiSecret'], )
- }
- response = requests.get(url, headers=headers)
- return response.json()
- def send_results(item):
- url = 'https://api.fourier.audio/v1/calendario/resultado'
- headers = {
- 'Authorization': 'Bearer {}'.format(config['apiSecret'], )
- }
- log.info('url: {}'.format(url))
- response = requests.post(url, json=item, headers=headers)
- return response
- def md5_checksum(filename):
- hash_md5 = hashlib.md5()
- with open(filename, "rb") as f:
- for chunk in iter(lambda: f.read(4096), b""):
- hash_md5.update(chunk)
- return hash_md5.hexdigest()
- app = setup_endpoint(queue=queue)
- loop = IOLoop.current()
- loop.add_callback(feed_queue)
- if __name__ == '__main__':
- try:
- log.info('Starting ondemand service')
- loop.start()
- except KeyboardInterrupt:
- log.error('Process killed')
|