# -*- coding: utf8 -*- from __future__ import print_function, absolute_import from tornado.ioloop import IOLoop from tornado.web import Application from client import Client, ConnectionError from boxconfig import parse_config from dejavu.recognize import FilePerSecondRecognizer from datetime import datetime, timedelta from endpoint import setup_endpoint from calibration import Calibrations from dejavu import Dejavu, CouldntDecodeError from firebase_admin import credentials from firebase_admin import db as fbdb from multiprocessing import Process from subprocess import Popen, PIPE import logging as log import firebase_admin import mutagen.mp3 import math import sys import os if sys.version_info >= (3, 0): from queue import Queue, Empty else: from Queue import Queue, Empty log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO) AUDIOS_PATH = '/tmp' AHEAD_TIME_AUDIO_TOLERANCE = 2 # second MAX_SEGMENT_THREADS = 4 THRESHOLD = 10 SEGMENTS_TOLERANCE_RATE = 0.6 FALL_TOLERANCE_SEGMENTS = 1 # THRESHOLD THRESHOLD_FIXED = 1 THRESHOLD_AVERAGE = 2 # Modos de procesamiento de queue # - QUEQUE_SINGLE: procesa solo un segmento a la vez # - QUEUE_THREAD: inicia un hilo para cada segmento # Por default se usará el threaded. # TODO: hacerlo configurable por medio de argumentos # de ejecución. QUEUE_SINGLE = 1 QUEUE_THREAD = 2 # Se pueden usar diferentes API'se # la de threading y la de multiprocessing. MultiAPI = Process config = parse_config() queue = Queue() client = Client(config['device_id'], config['apiSecret']) cloud_base_url = 'https://storage.googleapis.com/{}' \ .format(config['bucket']) base_path = config.get("basepath", "/var/fourier") fb_credentials = credentials.Certificate('/code/Fourier-key.json') firebase_admin.initialize_app(fb_credentials, config['firebase']) device_id = config['device_id'] device_path = os.path.join(base_path, device_id) recognizer = FilePerSecondRecognizer device_ref = fbdb.reference('devices').child(config['device_id']) calibrations = Calibrations(config['device_id'], client=client) # settings queue_mode = QUEUE_SINGLE threshold_mode = THRESHOLD_FIXED db_path = config.get('localDatabase', os.path.join(device_path, 'files.db')) #db = sqlite3.connect(db_path) cloud_cache = {} def process_segment(audios=None, calibration=None): """ Procesa una hora de audio """ #date = dateutil.parser.parse(item['fecha'], ignoretz=True) segment_size = 5 audio_length = 0 # 1.1 Calcular el número de segmentos requeridos # de acuerdo a la duración total del audio. try: filename = "/tmp/anuncios/-MOiAvmUkZLmXrAWCy9u.mp3" audio = mutagen.mp3.MP3(filename) audio_length = audio.info.length if segment_size == 'integer': segment_size = int(audio_length) elif segment_size == 'ceil': segment_size = int(math.ceil(audio_length / 5)) * 5 segments_needed = int(round(float(audio_length) / float(segment_size))) segments_needed = int(round(segments_needed * 0.8)) except Exception as ex: #log.error('[process_segment] file {} is not an mp3'.format(filename)) log.error(str(ex)) return dejavu = Dejavu({"database_type": "mem"}) try: dejavu.fingerprint_file(filename) except Exception as ex: log.error('[process_segment] cannot fingerprint: {}'.format(ex)) # 2. Read the list of files from local database audios_counter = 0 results = [] v = [] audios_iterable = [] for path, name, ts in audios_iterable: audios_counter += os.path.isfile(path) values = [] try: for match in dejavu.recognize(recognizer, path, segment_size): name = None ad = None results.append({ 'ad': ad, 'confidence': match['confidence'], 'timestamp': ts, 'offset': match['offset'], 'name': name }) values.append(str(match['confidence'])) ts += match['length'] / 1000 v.append(','.join(values)) log.info('[process_segment] {0}) {1}'.format( os.path.split(path)[-1], ','.join(values), )) except CouldntDecodeError as ex: log.error('[process_segment] {}'.format(ex)) try: encontrados = {} for i in item_ids: r = [result for result in results if result["name"] == i] encontrados[i] = find_repetitions(r, segments_needed=segments_needed, calibration=calibration,) #for id in encontrados: # for e in encontrados[id]: # for i in item['elementos']: # if i['id'] == id and i['anuncio'] == e['ad']: # if 'encontrados' not in i: # i['encontrados'] = [] # i['encontrados'].append(e) # break #item["archivos_perdidos"] = (12 - audios_counter) if audios_counter < 12 else 0 except ConnectionError as ex: log.error('[process_segment] {}'.format(str(ex))) except UserWarning as warn: log.warning(str(warn)) def find_repetitions(results, segments_needed=2, calibration=None): found_counter = 0 found_down_counter = 0 found_index = None expect_space = False expect_recover = False last_value_in_threshold_index = -1 fall_tolerance = calibration['fallTolerance'] found = [] high = 100 # Obtener este valor desde un parámetro middle_high = 50 # Obtener este valor desde un parámetro segment_middle_needed = 2 # Obtener este valor desde un parámetro found_high = None found_middle_high = [] if threshold_mode == THRESHOLD_FIXED: threshold = calibration['threshold'] elif threshold_mode == THRESHOLD_AVERAGE: values = [x['confidence'] for x in results] threshold = math.ceil(float(sum(values)) / float(len(values))) if segments_needed < 1: segments_needed = 1 for index, result in enumerate(results): #if result['confidence'] >= high: # if found_high is None: # found_high = index # elif result['confidence'] > results[found_high]['confidence']: # found_high = index #elif result['confidence'] >= middle_high: # found_middle_high.append(index) if not expect_space: if result['confidence'] >= threshold: found_counter += 1 last_value_in_threshold_index = index if found_index is None: found_index = index if expect_recover: found_counter += found_down_counter expect_recover = False elif fall_tolerance: if not expect_recover: if last_value_in_threshold_index != -1: """ Solo cuando ya haya entrado por lo menos un valor en el rango del threshold, es cuando se podrá esperar un valor bajo """ expect_recover = True found_down_counter += 1 else: pass else: """ Si después de haber pasado tolerado 1 elemento vuelve a salir otro fuera del threshold continuo, entonces ya se da por perdido """ found_counter = 0 found_down_counter = 0 found_index = None expect_recover = False else: found_counter = 0 found_down_counter = 0 found_index = None expect_recover = False # Aquí veremos si hay un valor alto #if found_high is not None: # found_row = results[found_high] # found.append(found_row) #elif len(found_middle_high) >= segment_middle_needed: # found_row = results[found_middle_high[0]] # found.append(found_row) #found_high = None #found_middle_high = [] else: if result['confidence'] <= threshold: expect_space = False if found_counter >= segments_needed: found_row = results[found_index] found.append(found_row) found_counter = 0 expect_space = True #found_high = None #found_middle_high = [] return found app = setup_endpoint(queue=queue) loop = IOLoop.current() loop.add_callback(process_segment) if __name__ == '__main__': try: log.info('Starting ondemand service') loop.start() except KeyboardInterrupt: log.error('Process killed')