| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572 | 
							- # -*- coding: utf8 -*-
 
- from __future__ import print_function, absolute_import
 
- from tornado.ioloop import IOLoop
 
- from tornado.web import Application
 
- from fourier.api.client import Client, ConnectionError
 
- from fourier.boxconfig import parse_config
 
- from fourier.dejavu.recognize import FilePerSecondRecognizer
 
- from datetime import datetime, timedelta
 
- from ondemand.endpoint import setup_endpoint
 
- from ondemand.calibration import Calibrations
 
- from fourier.dejavu import Dejavu, CouldntDecodeError
 
- from firebase_admin import credentials
 
- from firebase_admin import db as fbdb
 
- from binascii import hexlify
 
- from base64 import b64decode
 
- from threading import Thread
 
- from multiprocessing import Process
 
- from argparse import ArgumentParser
 
- from subprocess import Popen, PIPE
 
- import logging as log
 
- import firebase_admin
 
- import mutagen.mp3
 
- import OpenSSL.SSL
 
- import subprocess
 
- import requests
 
- import dateutil
 
- import hashlib
 
- import sqlite3
 
- import math
 
- import time
 
- import sys
 
- import os
 
- if sys.version_info >= (3, 0):
 
-     from queue import Queue, Empty
 
- else:
 
-     from Queue import Queue, Empty
 
- log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
 
- AUDIOS_PATH = '/tmp'
 
- AHEAD_TIME_AUDIO_TOLERANCE = 2  # second
 
- MAX_SEGMENT_THREADS = 4
 
- THRESHOLD = 10
 
- SEGMENTS_TOLERANCE_RATE = 0.6
 
- FALL_TOLERANCE_SEGMENTS = 1
 
- # THRESHOLD
 
- THRESHOLD_FIXED = 1
 
- THRESHOLD_AVERAGE = 2
 
- # Modos de procesamiento de queue
 
- #  - QUEQUE_SINGLE: procesa solo un segmento a la vez
 
- #  - QUEUE_THREAD:  inicia un hilo para cada segmento
 
- # Por default se usará el threaded.
 
- # TODO: hacerlo configurable por medio de argumentos
 
- #       de ejecución.
 
- QUEUE_SINGLE = 1
 
- QUEUE_THREAD = 2
 
- # Se pueden usar diferentes API'se
 
- # la de threading y la de multiprocessing.
 
- MultiAPI = Process
 
- config = parse_config()
 
- queue = Queue()
 
- client = Client(config['device_id'],
 
-                 config['apiSecret'])
 
- cloud_base_url = 'https://storage.googleapis.com/{}' \
 
-     .format(config['bucket'])
 
- base_path = config.get("basepath", "/var/fourier")
 
- fb_credentials = credentials.Certificate('/etc/Fourier-key.json')
 
- firebase_admin.initialize_app(fb_credentials, config['firebase'])
 
- device_id = config['device_id']
 
- device_path = os.path.join(base_path, device_id)
 
- recognizer = FilePerSecondRecognizer
 
- device_ref = fbdb.reference('devices').child(config['device_id'])
 
- calibrations = Calibrations(config['device_id'], client=client)
 
- # settings
 
- queue_mode = QUEUE_SINGLE
 
- threshold_mode = THRESHOLD_FIXED
 
- db_path = config.get('localDatabase', os.path.join(device_path, 'files.db'))
 
- db = sqlite3.connect(db_path)
 
- cloud_cache = {}
 
- def feed_queue():
 
-     """ Search for pending scheduled work in
 
-     server and add them to a memory queue. """
 
-     try:
 
-         response = get_pendings()
 
-         # response = client.get_schedule_pending()
 
-         # downloaded_counter = len(response['items'])
 
-         # for item in response['items']:
 
-         if len(response["elementos"]) > 0:
 
-             queue.put(response)
 
-         if queue.qsize() > 0:
 
-             if queue_mode == QUEUE_THREAD:
 
-                 loop.add_callback(process_queue_with_threads)
 
-             else:
 
-                 loop.add_callback(process_queue)
 
-         else:
 
-             loop.add_timeout(time.time() + 30, feed_queue)
 
-     except ConnectionError as ex:
 
-         log.error('[feed_queue] cannot feed: {}, retryig later'.format(ex))
 
-         loop.add_timeout(time.time() + 15, feed_queue)
 
-     except Exception as ex:
 
-         """ Errores desconocidos """
 
-         log.error('[feed_queue] {}'.format(ex))
 
-         loop.add_timeout(time.time() + 60, feed_queue)
 
-         raise ex
 
- def process_queue():
 
-     """ Try to the next item in a queue and start
 
-     processing it accordingly. If success, repeat
 
-     the function or go to feed if no more items. """
 
-     try:
 
-         item = queue.get(False)
 
-         process_segment(item)
 
-         loop.add_callback(process_queue)
 
-     except Empty:
 
-         loop.add_callback(feed_queue)
 
-     except Exception as ex:
 
-         log.error(ex)
 
-         loop.add_callback(process_queue)
 
- def process_queue_with_threads():
 
-     threads = [None] * MAX_SEGMENT_THREADS
 
-     is_drained = False
 
-     log.info('Starting thread processing')
 
-     while True:
 
-         for index, t in enumerate(threads):
 
-             if not t:
 
-                 try:
 
-                     item = queue.get(False)
 
-                     station = item['station']
 
-                     date = dateutil.parser.parse(item['date'], ignoretz=True)
 
-                     calibration = calibrations.get(station)
 
-                     audios = [f for f in iterate_audios(
 
-                         date, station,
 
-                         calibration=calibration
 
-                     )]
 
-                     thread = MultiAPI(target=process_segment,
 
-                                       args=(item,),
 
-                                       kwargs={
 
-                                           'audios': audios,
 
-                                           'calibration': calibration,
 
-                                       }
 
-                                       )
 
-                     threads[index] = thread
 
-                     thread.start()
 
-                 except Empty:
 
-                     is_drained = True
 
-                 except Exception as err:
 
-                     log.error('[process_queue_with_threads] [{}] {}'.format(
 
-                         station,
 
-                         err,
 
-                     ))
 
-                     continue
 
-             elif not t.is_alive():
 
-                 threads[index] = None
 
-         if is_drained:
 
-             if threads.count(None) == MAX_SEGMENT_THREADS:
 
-                 break
 
-     log.info('Finished thread processing')
 
-     loop.add_callback(feed_queue)
 
- def process_segment(item, audios=None, calibration=None):
 
-     """ Procesa una hora de audio """
 
-     station = item['estacion']
 
-     if not calibration:
 
-         calibration = calibrations.get(station)
 
-     tolerance = calibration['tolerance']
 
-     date = dateutil.parser.parse(item['fecha'], ignoretz=True)
 
-     segment_size = calibration['segmentSize']
 
-     audio_length = 0
 
-     log.info('[process_segment] (th: {}, tl: {}, ft: {}, ss: {}, ho: {}) {}' \
 
-         .format(
 
-             calibration['threshold'],
 
-             calibration['tolerance'],
 
-             calibration['fallTolerance'],
 
-             calibration['segmentSize'],
 
-             calibration['hourlyOffset'],
 
-             item,
 
-         )
 
-     )
 
-     # 1. obtener el audio desde firebase
 
-     #    y calcular su fingerprint.
 
-     try:
 
-         filenames = []
 
-         id_by_ad = {}
 
-         item_ids = []
 
-         x = 0
 
-         for i in item["elementos"]:
 
-             x = x + 1
 
-             log.info('[process_segment] downloading ad {} {}'.format(x, i["anuncio"]))
 
-             id_by_ad[i['anuncio']] = i['id']
 
-             if i['id'] not in item_ids:
 
-                 item_ids.append(i['id'])
 
-             filename, md5hash = cloud_download(ad_key=i["anuncio"])
 
-             if filename:
 
-                 filenames.append((filename, md5hash))
 
-             else:
 
-                 log.info('[process_segment] ad file missing')
 
-     except Exception as err:
 
-         log.error('[process_segment] [{}] {}'.format(station, err))
 
-         return
 
-     # 1.1 Calcular el número de segmentos requeridos
 
-     # de acuerdo a la duración total del audio.
 
-     try:
 
-         filename, md5hash = filenames[0]
 
-         audio = mutagen.mp3.MP3(filename)
 
-         audio_length = audio.info.length
 
-         if segment_size == 'integer':
 
-             segment_size = int(audio_length)
 
-         elif segment_size == 'ceil':
 
-             segment_size = int(math.ceil(audio_length / 5)) * 5
 
-         segments_needed = int(round(float(audio_length) / float(segment_size)))
 
-         segments_needed = int(round(segments_needed * tolerance))
 
-     except Exception as ex:
 
-         log.error('[process_segment] file {} is not an mp3'.format(filename))
 
-         log.error(str(ex))
 
-         return
 
-     dejavu = Dejavu({"database_type": "mem"})
 
-     try:
 
-         x = 0
 
-         for i in filenames:
 
-             filename = i[0]
 
-             dejavu.fingerprint_file(filename)
 
-     except Exception as ex:
 
-         log.error('[process_segment] cannot fingerprint: {}'.format(ex))
 
-     """ Hay dos posibles escensarios al obtener los audios
 
-       a. Los audios vienen por el parámetro "audios" de la
 
-          función, siendo esta una lista.
 
-       b. Los audios se obtienen directamente de la base
 
-          de datos en modo de cursor.
 
-     """
 
-     try:
 
-         audios_iterable = audios if audios \
 
-             else iterate_audios(date, station, calibration=calibration)
 
-     except sqlite3.OperationalError as err:
 
-         log.error('[process_segment] [{}] {}'.format(station, err))
 
-         return
 
-     # 2. Read the list of files from local database
 
-     audios_counter = 0
 
-     results = []
 
-     v = []
 
-     for path, name, ts in audios_iterable:
 
-         # short_path = os.path.join(station, name)
 
-         audios_counter += os.path.isfile(path)
 
-         values = []
 
-         if not os.path.isfile(path):
 
-             download_file(path)
 
-         try:
 
-             for match in dejavu.recognize(recognizer, path, segment_size):
 
-                 name = None
 
-                 ad = None
 
-                 try:
 
-                     ad = match['name']
 
-                     if match['name'] in id_by_ad.keys():
 
-                         name = id_by_ad[match['name']]
 
-                     else:
 
-                         name = match['name']
 
-                 except KeyError:
 
-                     pass
 
-                 results.append({
 
-                     'ad': ad,
 
-                     'confidence': match['confidence'],
 
-                     'timestamp': ts,
 
-                     'offset': match['offset'],
 
-                     'name': name
 
-                 })
 
-                 values.append(str(match['confidence']))
 
-                 ts += match['length'] / 1000
 
-             v.append(','.join(values))
 
-             log.info('[process_segment] [{2}] {0}) {1}'.format(
 
-                 os.path.split(path)[-1],
 
-                 ','.join(values),
 
-                 station,
 
-             ))
 
-         except CouldntDecodeError as ex:
 
-             log.error('[process_segment] {}'.format(ex))
 
-     try:
 
-         encontrados = {}
 
-         for i in item_ids:
 
-             r = [result for result in results if result["name"] == i]
 
-             encontrados[i] = find_repetitions(r, segments_needed=segments_needed, calibration=calibration,)
 
-         for id in encontrados:
 
-             for e in encontrados[id]:
 
-                 for i in item['elementos']:
 
-                     if i['id'] == id and i['anuncio'] == e['ad']:
 
-                         if 'encontrados' not in i:
 
-                             i['encontrados'] = []
 
-                         i['encontrados'].append(e)
 
-                         break
 
-         item["archivos_perdidos"] = (12 - audios_counter) if audios_counter < 12 else 0
 
-         response = send_results(item)
 
-         log.info('[{}] API response: {}'.format(station, response))
 
-     except ConnectionError as ex:
 
-         log.error('[process_segment] {}'.format(str(ex)))
 
-     except UserWarning as warn:
 
-         log.warning(str(warn))
 
- def find_repetitions(results, segments_needed=2, calibration=None):
 
-     found_counter = 0
 
-     found_down_counter = 0
 
-     found_index = None
 
-     expect_space = False
 
-     expect_recover = False
 
-     last_value_in_threshold_index = -1
 
-     fall_tolerance = calibration['fallTolerance']
 
-     found = []
 
-     high = 100 # Obtener este valor desde un parámetro
 
-     middle_high = 50 # Obtener este valor desde un parámetro
 
-     segment_middle_needed = 2 # Obtener este valor desde un parámetro
 
-     found_high = None
 
-     found_middle_high = []
 
-     if threshold_mode == THRESHOLD_FIXED:
 
-         threshold = calibration['threshold']
 
-     elif threshold_mode == THRESHOLD_AVERAGE:
 
-         values = [x['confidence'] for x in results]
 
-         threshold = math.ceil(float(sum(values)) / float(len(values)))
 
-     if segments_needed < 1:
 
-         segments_needed = 1
 
-     for index, result in enumerate(results):
 
-         #if result['confidence'] >= high:
 
-         #    if found_high is None:
 
-         #        found_high = index
 
-         #    elif result['confidence'] > results[found_high]['confidence']:
 
-         #        found_high = index
 
-         #elif result['confidence'] >= middle_high:
 
-         #    found_middle_high.append(index)
 
-         if not expect_space:
 
-             if result['confidence'] >= threshold:
 
-                 found_counter += 1
 
-                 last_value_in_threshold_index = index
 
-                 if found_index is None:
 
-                     found_index = index
 
-                 if expect_recover:
 
-                     found_counter += found_down_counter
 
-                     expect_recover = False
 
-             elif fall_tolerance:
 
-                 if not expect_recover:
 
-                     if last_value_in_threshold_index != -1:
 
-                         """ Solo cuando ya haya entrado por lo menos
 
-                         un valor en el rango del threshold, es cuando
 
-                         se podrá esperar un valor bajo """
 
-                         expect_recover = True
 
-                         found_down_counter += 1
 
-                     else:
 
-                         pass
 
-                 else:
 
-                     """ Si después de haber pasado tolerado 1 elemento
 
-                     vuelve a salir otro fuera del threshold continuo,
 
-                     entonces ya se da por perdido """
 
-                     found_counter = 0
 
-                     found_down_counter = 0
 
-                     found_index = None
 
-                     expect_recover = False
 
-             else:
 
-                 found_counter = 0
 
-                 found_down_counter = 0
 
-                 found_index = None
 
-                 expect_recover = False
 
-                 # Aquí veremos si hay un valor alto
 
-                 #if found_high is not None:
 
-                 #    found_row = results[found_high]
 
-                 #    found.append(found_row)
 
-                 #elif len(found_middle_high) >= segment_middle_needed:
 
-                 #    found_row = results[found_middle_high[0]]
 
-                 #    found.append(found_row)
 
-                 #found_high = None
 
-                 #found_middle_high = []
 
-         else:
 
-             if result['confidence'] <= threshold:
 
-                 expect_space = False
 
-         if found_counter >= segments_needed:
 
-             found_row = results[found_index]
 
-             found.append(found_row)
 
-             found_counter = 0
 
-             expect_space = True
 
-             #found_high = None
 
-             #found_middle_high = []
 
-     return found
 
- def iterate_audios(dt, station, calibration=None):
 
-     """ Given a datetime object and an station,
 
-     iterate a list of files that are between
 
-     the the date and itself plus 5 minutes;
 
-     station must match too """
 
-     tm = time.mktime(dt.timetuple())
 
-     if calibration and calibration['hourlyOffset']:
 
-         hoffset = calibration['hourlyOffset']
 
-         from_time = tm + hoffset
 
-         to_time = tm + 3599 + hoffset
 
-     elif AHEAD_TIME_AUDIO_TOLERANCE:
 
-         """ Conventional mode """
 
-         from_time = tm + AHEAD_TIME_AUDIO_TOLERANCE
 
-         to_time = from_time + 3599 + AHEAD_TIME_AUDIO_TOLERANCE
 
-     log.info('from {} to {}'.format(int(from_time), int(to_time)))
 
-     cursor = db.cursor()
 
-     cursor.execute((
 
-         'select "filename", "timestamp" '
 
-         'from "file" '
 
-         'where "timestamp" between ? and ? '
 
-         'and "station" = ? '
 
-         'order by "timestamp" asc'
 
-     ),
 
-         (from_time, to_time, station,),
 
-     )
 
-     files = [file for file in cursor]
 
-     cursor.close()
 
-     for mp3 in files:
 
-         mp3path, ts = mp3
 
-         mp3name = os.path.basename(mp3path)
 
-         yield (mp3path, mp3name, ts)
 
- def cloud_download(ad_key=None):
 
-     """ Given an ad key, the file is downloaded to
 
-     the system temporal folder to be processed """
 
-     if ad_key in cloud_cache:
 
-         """ If this file has already been downloaded,
 
-         will not be downloaded again, instead will
 
-         be taken from cloud_cache dictionary """
 
-         filename, md5hash = cloud_cache[ad_key]
 
-         if os.path.isfile(filename):
 
-             return filename, md5hash
 
-     ad = fbdb.reference('ads/{}'.format(ad_key)).get()
 
-     filename = os.path.basename(ad['path'])
 
-     out_file = os.path.join(AUDIOS_PATH, filename)
 
-     url = '{}/{}'.format(cloud_base_url, ad['path'])
 
-     if(os.path.isfile(out_file)):
 
-         return out_file, md5_checksum(out_file)
 
-     response = requests.get(url)
 
-     if response.status_code == 200:
 
-         hashes = response.headers['x-goog-hash']
 
-         hashes = hashes.split(',')
 
-         hashes = [h.split('=', 1) for h in hashes]
 
-         hashes = {h[0].strip(): hexlify(b64decode(h[1])) for h in hashes}
 
-         md5sum = hashes['md5']
 
-         with open(out_file, "wb") as fp:
 
-             fp.write(response.content)
 
-             tp = (out_file, md5sum,)
 
-             p = Popen(['ffprobe', '-v', 'error', '-select_streams', 'a:0', '-show_entries', 'stream=codec_name', '-of',
 
-                        'default=nokey=1:noprint_wrappers=1', out_file], stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
-             rc = p.returncode
 
-             if rc != 'mp3\n':
 
-                 subprocess.call(['mv', out_file, out_file + '.old'])
 
-                 subprocess.call(
 
-                     ['ffmpeg', '-hide_banner', '-loglevel', 'panic', '-i', out_file + '.old', '-codec:a', 'libmp3lame',
 
-                      '-qscale:a', '2', '-f', 'mp3', out_file])
 
-                 subprocess.call(['rm', '-rf', out_file + '.old'])
 
-             cloud_cache[ad_key] = tp
 
-             return tp
 
- def download_file(file_path=None):
 
-     file_path_cloud = file_path.replace("/var/fourier/", "")
 
-     url = '{}/{}'.format(cloud_base_url, file_path_cloud)
 
-     response = requests.get(url)
 
-     if response.status_code == 200:
 
-         with open(file_path, "wb") as fp:
 
-             fp.write(response.content)
 
-         cursor = db.cursor()
 
-         cursor.execute('update "file" set uploaded = 0 where filename = ?', (file_path,), )
 
-         cursor.close()
 
- def get_pendings():
 
-     url = 'https://api.fourier.audio/v1/calendario/pendiente?id={}'.format(config['device_id'], )
 
-     headers = {
 
-         'Authorization': 'Bearer {}'.format(config['apiSecret'], )
 
-     }
 
-     response = requests.get(url, headers=headers)
 
-     return response.json()
 
- def send_results(item):
 
-     url = 'https://api.fourier.audio/v1/calendario/resultado'
 
-     headers = {
 
-         'Authorization': 'Bearer {}'.format(config['apiSecret'], )
 
-     }
 
-     log.info('url: {}'.format(url))
 
-     response = requests.post(url, json=item, headers=headers)
 
-     return response
 
- def md5_checksum(filename):
 
-     hash_md5 = hashlib.md5()
 
-     with open(filename, "rb") as f:
 
-         for chunk in iter(lambda: f.read(4096), b""):
 
-             hash_md5.update(chunk)
 
-     return hash_md5.hexdigest()
 
- app = setup_endpoint(queue=queue)
 
- loop = IOLoop.current()
 
- loop.add_callback(feed_queue)
 
- if __name__ == '__main__':
 
-     try:
 
-         log.info('Starting ondemand service')
 
-         loop.start()
 
-     except KeyboardInterrupt:
 
-         log.error('Process killed')
 
 
  |