# -*- coding: utf8 -*- from __future__ import print_function, absolute_import from tornado.ioloop import IOLoop from tornado.web import Application from fourier.api.client import Client, ConnectionError from fourier.boxconfig import parse_config from fourier.dejavu.recognize import FilePerSecondRecognizer from datetime import datetime, timedelta from ondemand.endpoint import setup_endpoint from ondemand.calibration import Calibrations from fourier.dejavu import Dejavu, CouldntDecodeError from firebase_admin import credentials from firebase_admin import db as fbdb from binascii import hexlify from base64 import b64decode from threading import Thread from multiprocessing import Process from argparse import ArgumentParser from subprocess import Popen, PIPE import logging as log import firebase_admin import mutagen.mp3 import OpenSSL.SSL import subprocess import requests import dateutil import sqlite3 import math import time import sys import os if sys.version_info >= (3, 0): from queue import Queue, Empty else: from Queue import Queue, Empty log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO) AUDIOS_PATH = '/tmp' AHEAD_TIME_AUDIO_TOLERANCE = 2 # second MAX_SEGMENT_THREADS = 4 THRESHOLD = 10 SEGMENTS_TOLERANCE_RATE = 0.6 FALL_TOLERANCE_SEGMENTS = 1 # THRESHOLD THRESHOLD_FIXED = 1 THRESHOLD_AVERAGE = 2 # Modos de procesamiento de queue #  - QUEQUE_SINGLE: procesa solo un segmento a la vez # - QUEUE_THREAD: inicia un hilo para cada segmento # Por default se usará el threaded. # TOOD: hacerlo configurable por medio de argumentos # de ejecución. QUEUE_SINGLE = 1 QUEUE_THREAD = 2 # Se pueden usar diferentes API's # la de threading y la de multiprocessing. MultiAPI = Process config = parse_config() queue = Queue() client = Client(config['device_id'], config['apiSecret']) cloud_base_url = 'https://storage.googleapis.com/{}'\ .format(config['bucket']) base_path = config.get("basepath", "/var/fourier") fb_credentials = credentials.Certificate('/etc/Fourier-key.json') firebase_admin.initialize_app(fb_credentials, config['firebase']) dejavu = Dejavu({"database_type":"mem"}) device_id = config['device_id'] device_path = os.path.join(base_path, device_id) recognizer = FilePerSecondRecognizer device_ref = fbdb.reference('devices').child(config['device_id']) calibrations = Calibrations(config['device_id'], client=client) # settings queue_mode = QUEUE_THREAD threshold_mode = THRESHOLD_FIXED db_path = config.get('localDatabase', os.path.join(device_path, 'files.db')) db = sqlite3.connect(db_path) cloud_cache = {} def feed_queue(): """ Search for pending scheduled work in server and add them to a memory queue. """ try: response = client.get_schedule_pending() downloaded_counter = len(response['items']) for item in response['items']: queue.put(item) if downloaded_counter: log.info(('[feed_queue] {} new ' + 'pending schedule items.')\ .format(downloaded_counter) ) if queue.qsize() > 0: if queue_mode == QUEUE_THREAD: loop.add_callback(process_queue_with_threads) else: loop.add_callback(process_queue) else: loop.add_timeout(time.time() + 30, feed_queue) except ConnectionError as ex: log.error('[feed_queue] cannot feed: {}, retryig later'.format(ex)) loop.add_timeout(time.time() + 15, feed_queue) except Exception as ex: """ Errores desconocidos """ log.error('[feed_queue] {}'.format(ex)) loop.add_timeout(time.time() + 60, feed_queue) raise ex def process_queue(): """ Try to the next item in a queue and start processing it accordingly. If success, repeat the function or go to feed if no more items. """ try: item = queue.get(False) process_segment(item) loop.add_callback(process_queue) except Empty: loop.add_callback(feed_queue) except Exception as ex: log.error(ex) loop.add_callback(process_queue) def process_queue_with_threads(): threads = [None] * MAX_SEGMENT_THREADS is_drained = False log.info('Starting thread processing') while True: for index, t in enumerate(threads): if not t: try: item = queue.get(False) station = item['station'] date = dateutil.parser.parse(item['date']) calibration = calibrations.get(station) audios = [f for f in iterate_audios( date, station, calibration=calibration )] thread = MultiAPI(target=process_segment, args=(item,), kwargs={ 'audios': audios, 'calibration': calibration, } ) threads[index] = thread thread.start() except Empty: is_drained = True except Exception as err: log.error('[process_queue_with_threads] [{}] {}'.format( station, err, )) continue elif not t.is_alive(): threads[index] = None if is_drained: if threads.count(None) == MAX_SEGMENT_THREADS: break log.info('Finished thread processing') loop.add_callback(feed_queue) def process_segment(item, audios=None, calibration=None): """ Procesa una hora de audio """ station = item['station'] if not calibration: calibration = calibrations.get(station) tolerance = calibration['tolerance'] date = dateutil.parser.parse(item['date']) segment_size = calibration['segmentSize'] audio_length = 0 log.info('[process_segment] (th: {}, tl: {}, ft: {}, ss: {}, ho: {}) {}'\ .format( calibration['threshold'], calibration['tolerance'], calibration['fallTolerance'], calibration['segmentSize'], calibration['hourlyOffset'], item, ) ) # 1. obtener el audio desde firebase # y calcular su fingerprint. try: filename, md5hash = cloud_download(ad_key=item['ad']) if not filename: log.info('[process_segment] ad file missing') return except Exception as err: log.error('[process_segment] [{}] {}'.format(station, err)) return # 1.1 Calcular el número de segmentos requeridos # de acuerdo a la duración total del audio. try: audio = mutagen.mp3.MP3(filename) audio_length = audio.info.length if segment_size == 'integer': segment_size = int(audio_length) elif segment_size == 'ceil': segment_size = int(math.ceil(audio_length / 5)) * 5 segments_needed = int(round(float(audio_length) / float(segment_size))) segments_needed = int(round(segments_needed * tolerance)) except Exception as ex: log.error('[process_segment] file {} is not an mp3'.format(filename)) log.error(str(ex)) return try: dejavu.fingerprint_file(filename) except Exception as ex: log.error('[process_segment] cannot fingerprint: {}'.format(ex)) """ Hay dos posibles escensarios al obtener los audios a. Los audios vienen por el parámetro "audios" de la función, siendo esta una lista. b. Los audios se obtienen directamente de la base de datos en modo de cursor. """ try: audios_iterable = audios if audios \ else iterate_audios(date, station, calibration=calibration) except sqlite3.OperationalError as err: log.error('[process_segment] [{}] {}'.format(station, err)) return # 2. Read the list of files from local database audios_counter = 0 results = [] for path, name, ts in audios_iterable: short_path = os.path.join(station, name) audios_counter += os.path.isfile(path) values = [] if not os.path.isfile(path): log.error('[process_segment] file not found: {}'\ .format(short_path)) continue try: for match in dejavu.recognize(recognizer, path, segment_size, ads_filter=[md5hash]): results.append({ 'confidence': match['confidence'], 'timestamp': ts, 'offset': match['offset'] }) values.append(str(match['confidence'])) ts += match['length'] / 1000 log.info('[process_segment] [{3}] {2} {0}) {1}'.format( os.path.split(path)[-1], ','.join(values), item['ad'], station, )) except CouldntDecodeError as ex: log.error('[process_segment] {}'.format(ex)) try: response = client.put_schedule_results( item['schedule'], item['id'], None, # TODO: send results again found=find_repetitions(results, segments_needed=segments_needed, calibration=calibration, ), missing_files=(12 - audios_counter) \ if audios_counter < 12 else 0 ) log.info('[{}] API response: {}'.format(station, response)) except ConnectionError as ex: log.error('[process_segment] {}'.format(str(ex))) except UserWarning as warn: log.warning(str(warn)) def find_repetitions(results, segments_needed=2, calibration=None): found_counter = 0 found_down_counter = 0 found_index = None expect_space = False expect_recover = False last_value_in_threshold_index = -1 fall_tolerance = calibration['fallTolerance'] found = [] if threshold_mode == THRESHOLD_FIXED: threshold = calibration['threshold'] elif threshold_mode == THRESHOLD_AVERAGE: values = [x['confidence'] for x in results] threshold = math.ceil(float(sum(values)) / float(len(values))) if segments_needed < 1: segments_needed = 1 for index, result in enumerate(results): if not expect_space: if result['confidence'] >= threshold: found_counter += 1 last_value_in_threshold_index = index if found_index is None: found_index = index if expect_recover: found_counter += found_down_counter expect_recover = False elif fall_tolerance: if not expect_recover: if last_value_in_threshold_index != -1: """ Solo cuando ya haya entrado por lo menos un valor en el rango del threshold, es cuando se podrá esperar un valor bajo """ expect_recover = True found_down_counter += 1 else: pass else: """ Si después de haber pasado tolerado 1 elemento vuelve a salir otro fuera del threshold continuo, entonces ya se da por perdido """ found_counter = 0 found_down_counter = 0 found_index = None expect_recover = False else: found_counter = 0 found_down_counter = 0 found_index = None expect_recover = False else: if result['confidence'] <= threshold: expect_space = False if found_counter >= segments_needed: found.append(results[found_index]['timestamp']) found_counter = 0 expect_space = True return found def iterate_audios(dt, station, calibration=None): """ Given a datetime object and an station, iterate a list of files that are between the the date and itself plus 5 minutes; station must match too """ tm = time.mktime(dt.timetuple()) if calibration and calibration['hourlyOffset']: hoffset = calibration['hourlyOffset'] from_time = tm + hoffset to_time = tm + 3599 + hoffset elif AHEAD_TIME_AUDIO_TOLERANCE: """ Conventional mode """ from_time = tm + AHEAD_TIME_AUDIO_TOLERANCE to_time = from_time + 3599 + AHEAD_TIME_AUDIO_TOLERANCE log.info('from {} to {}'.format(int(from_time), int(to_time))) cursor = db.cursor() cursor.execute(( 'select "filename", "timestamp" ' 'from "file" ' 'where "timestamp" between ? and ? ' 'and "station" = ? ' 'order by "timestamp" asc' ), (from_time, to_time, station, ), ) files = [file for file in cursor] cursor.close() for mp3 in files: mp3path, ts = mp3 mp3name = os.path.basename(mp3path) yield (mp3path, mp3name, ts) def cloud_download(ad_key=None): """ Given an ad key, the file is downloaded to the system temporal folder to be processed """ if ad_key in cloud_cache: """ If this file has already been downloaded, will not be downloaded again, instead will be taken from cloud_cache dictionary """ filename, md5hash = cloud_cache[ad_key] if os.path.isfile(filename): return filename, md5hash ad = fbdb.reference('ads/{}'.format(ad_key)).get() filename = os.path.basename(ad['path']) out_file = os.path.join(AUDIOS_PATH, filename) url = '{}/{}'.format(cloud_base_url, ad['path']) response = requests.get(url) if response.status_code == 200: hashes = response.headers['x-goog-hash'] hashes = hashes.split(',') hashes = [h.split('=', 1) for h in hashes] hashes = {h[0].strip(): hexlify(b64decode(h[1])) for h in hashes} md5sum = hashes['md5'] with open(out_file, "wb") as fp: fp.write(response.content) tp = (out_file, md5sum,) p = Popen(['ffprobe', '-v', 'error', '-select_streams', 'a:0', '-show_entries', 'stream=codec_name', '-of', 'default=nokey=1:noprint_wrappers=1', out_file], stdin=PIPE, stdout=PIPE, stderr=PIPE) rc = p.returncode log.info("[process_segment] file {} returncode= {}", (out_file, rc,)) if rc != 'mp3\n': log.info('[process_segment] file {} is not mp3'.format(out_file)) subprocess.call(['mv', out_file, out_file + '.old']) subprocess.call(['ffmpeg', '-hide_banner', '-loglevel', 'panic', '-i', out_file + '.old', '-f', 'mp3', out_file]) subprocess.call(['rm', '-rf', out_file + '.old']) cloud_cache[ad_key] = tp return tp app = setup_endpoint(queue=queue) loop = IOLoop.current() loop.add_callback(feed_queue) if __name__ == '__main__': try: log.info('Starting ondemand service') loop.start() except KeyboardInterrupt: log.error('Process killed')