| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572 | # -*- coding: utf8 -*-from __future__ import print_function, absolute_importfrom tornado.ioloop import IOLoopfrom tornado.web import Applicationfrom fourier.api.client import Client, ConnectionErrorfrom fourier.boxconfig import parse_configfrom fourier.dejavu.recognize import FilePerSecondRecognizerfrom datetime import datetime, timedeltafrom ondemand.endpoint import setup_endpointfrom ondemand.calibration import Calibrationsfrom fourier.dejavu import Dejavu, CouldntDecodeErrorfrom firebase_admin import credentialsfrom firebase_admin import db as fbdbfrom binascii import hexlifyfrom base64 import b64decodefrom threading import Threadfrom multiprocessing import Processfrom argparse import ArgumentParserfrom subprocess import Popen, PIPEimport logging as logimport firebase_adminimport mutagen.mp3import OpenSSL.SSLimport subprocessimport requestsimport dateutilimport hashlibimport sqlite3import mathimport timeimport sysimport osif sys.version_info >= (3, 0):    from queue import Queue, Emptyelse:    from Queue import Queue, Emptylog.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)AUDIOS_PATH = '/tmp'AHEAD_TIME_AUDIO_TOLERANCE = 2  # secondMAX_SEGMENT_THREADS = 4THRESHOLD = 10SEGMENTS_TOLERANCE_RATE = 0.6FALL_TOLERANCE_SEGMENTS = 1# THRESHOLDTHRESHOLD_FIXED = 1THRESHOLD_AVERAGE = 2# Modos de procesamiento de queue#  - QUEQUE_SINGLE: procesa solo un segmento a la vez#  - QUEUE_THREAD:  inicia un hilo para cada segmento# Por default se usará el threaded.# TODO: hacerlo configurable por medio de argumentos#       de ejecución.QUEUE_SINGLE = 1QUEUE_THREAD = 2# Se pueden usar diferentes API'se# la de threading y la de multiprocessing.MultiAPI = Processconfig = parse_config()queue = Queue()client = Client(config['device_id'],                config['apiSecret'])cloud_base_url = 'https://storage.googleapis.com/{}' \    .format(config['bucket'])base_path = config.get("basepath", "/var/fourier")fb_credentials = credentials.Certificate('/etc/Fourier-key.json')firebase_admin.initialize_app(fb_credentials, config['firebase'])device_id = config['device_id']device_path = os.path.join(base_path, device_id)recognizer = FilePerSecondRecognizerdevice_ref = fbdb.reference('devices').child(config['device_id'])calibrations = Calibrations(config['device_id'], client=client)# settingsqueue_mode = QUEUE_SINGLEthreshold_mode = THRESHOLD_FIXEDdb_path = config.get('localDatabase', os.path.join(device_path, 'files.db'))db = sqlite3.connect(db_path)cloud_cache = {}def feed_queue():    """ Search for pending scheduled work in    server and add them to a memory queue. """    try:        response = get_pendings()        # response = client.get_schedule_pending()        # downloaded_counter = len(response['items'])        # for item in response['items']:        if len(response["elementos"]) > 0:            queue.put(response)        if queue.qsize() > 0:            if queue_mode == QUEUE_THREAD:                loop.add_callback(process_queue_with_threads)            else:                loop.add_callback(process_queue)        else:            loop.add_timeout(time.time() + 30, feed_queue)    except ConnectionError as ex:        log.error('[feed_queue] cannot feed: {}, retryig later'.format(ex))        loop.add_timeout(time.time() + 15, feed_queue)    except Exception as ex:        """ Errores desconocidos """        log.error('[feed_queue] {}'.format(ex))        loop.add_timeout(time.time() + 60, feed_queue)        raise exdef process_queue():    """ Try to the next item in a queue and start    processing it accordingly. If success, repeat    the function or go to feed if no more items. """    try:        item = queue.get(False)        process_segment(item)        loop.add_callback(process_queue)    except Empty:        loop.add_callback(feed_queue)    except Exception as ex:        log.error(ex)        loop.add_callback(process_queue)def process_queue_with_threads():    threads = [None] * MAX_SEGMENT_THREADS    is_drained = False    log.info('Starting thread processing')    while True:        for index, t in enumerate(threads):            if not t:                try:                    item = queue.get(False)                    station = item['station']                    date = dateutil.parser.parse(item['date'], ignoretz=True)                    calibration = calibrations.get(station)                    audios = [f for f in iterate_audios(                        date, station,                        calibration=calibration                    )]                    thread = MultiAPI(target=process_segment,                                      args=(item,),                                      kwargs={                                          'audios': audios,                                          'calibration': calibration,                                      }                                      )                    threads[index] = thread                    thread.start()                except Empty:                    is_drained = True                except Exception as err:                    log.error('[process_queue_with_threads] [{}] {}'.format(                        station,                        err,                    ))                    continue            elif not t.is_alive():                threads[index] = None        if is_drained:            if threads.count(None) == MAX_SEGMENT_THREADS:                break    log.info('Finished thread processing')    loop.add_callback(feed_queue)def process_segment(item, audios=None, calibration=None):    """ Procesa una hora de audio """    station = item['estacion']    if not calibration:        calibration = calibrations.get(station)    tolerance = calibration['tolerance']    date = dateutil.parser.parse(item['fecha'], ignoretz=True)    segment_size = calibration['segmentSize']    audio_length = 0    log.info('[process_segment] (th: {}, tl: {}, ft: {}, ss: {}, ho: {}) {}' \        .format(            calibration['threshold'],            calibration['tolerance'],            calibration['fallTolerance'],            calibration['segmentSize'],            calibration['hourlyOffset'],            item,        )    )    # 1. obtener el audio desde firebase    #    y calcular su fingerprint.    try:        filenames = []        id_by_ad = {}        item_ids = []        x = 0        for i in item["elementos"]:            x = x + 1            log.info('[process_segment] downloading ad {} {}'.format(x, i["anuncio"]))            id_by_ad[i['anuncio']] = i['id']            if i['id'] not in item_ids:                item_ids.append(i['id'])            filename, md5hash = cloud_download(ad_key=i["anuncio"])            if filename:                filenames.append((filename, md5hash))            else:                log.info('[process_segment] ad file missing')    except Exception as err:        log.error('[process_segment] [{}] {}'.format(station, err))        return    # 1.1 Calcular el número de segmentos requeridos    # de acuerdo a la duración total del audio.    try:        filename, md5hash = filenames[0]        audio = mutagen.mp3.MP3(filename)        audio_length = audio.info.length        if segment_size == 'integer':            segment_size = int(audio_length)        elif segment_size == 'ceil':            segment_size = int(math.ceil(audio_length / 5)) * 5        segments_needed = int(round(float(audio_length) / float(segment_size)))        segments_needed = int(round(segments_needed * tolerance))    except Exception as ex:        log.error('[process_segment] file {} is not an mp3'.format(filename))        log.error(str(ex))        return    dejavu = Dejavu({"database_type": "mem"})    try:        x = 0        for i in filenames:            filename = i[0]            dejavu.fingerprint_file(filename)    except Exception as ex:        log.error('[process_segment] cannot fingerprint: {}'.format(ex))    """ Hay dos posibles escensarios al obtener los audios      a. Los audios vienen por el parámetro "audios" de la         función, siendo esta una lista.      b. Los audios se obtienen directamente de la base         de datos en modo de cursor.    """    try:        audios_iterable = audios if audios \            else iterate_audios(date, station, calibration=calibration)    except sqlite3.OperationalError as err:        log.error('[process_segment] [{}] {}'.format(station, err))        return    # 2. Read the list of files from local database    audios_counter = 0    results = []    v = []    for path, name, ts in audios_iterable:        # short_path = os.path.join(station, name)        audios_counter += os.path.isfile(path)        values = []        if not os.path.isfile(path):            download_file(path)        try:            for match in dejavu.recognize(recognizer, path, segment_size):                name = None                ad = None                try:                    ad = match['name']                    if match['name'] in id_by_ad.keys():                        name = id_by_ad[match['name']]                    else:                        name = match['name']                except KeyError:                    pass                results.append({                    'ad': ad,                    'confidence': match['confidence'],                    'timestamp': ts,                    'offset': match['offset'],                    'name': name                })                values.append(str(match['confidence']))                ts += match['length'] / 1000            v.append(','.join(values))            log.info('[process_segment] [{2}] {0}) {1}'.format(                os.path.split(path)[-1],                ','.join(values),                station,            ))        except CouldntDecodeError as ex:            log.error('[process_segment] {}'.format(ex))    try:        encontrados = {}        for i in item_ids:            r = [result for result in results if result["name"] == i]            encontrados[i] = find_repetitions(r, segments_needed=segments_needed, calibration=calibration,)        for id in encontrados:            for e in encontrados[id]:                for i in item['elementos']:                    if i['id'] == id and i['anuncio'] == e['ad']:                        if 'encontrados' not in i:                            i['encontrados'] = []                        i['encontrados'].append(e)                        break        item["archivos_perdidos"] = (12 - audios_counter) if audios_counter < 12 else 0        response = send_results(item)        log.info('[{}] API response: {}'.format(station, response))    except ConnectionError as ex:        log.error('[process_segment] {}'.format(str(ex)))    except UserWarning as warn:        log.warning(str(warn))def find_repetitions(results, segments_needed=2, calibration=None):    found_counter = 0    found_down_counter = 0    found_index = None    expect_space = False    expect_recover = False    last_value_in_threshold_index = -1    fall_tolerance = calibration['fallTolerance']    found = []    high = 100 # Obtener este valor desde un parámetro    middle_high = 50 # Obtener este valor desde un parámetro    segment_middle_needed = 2 # Obtener este valor desde un parámetro    found_high = None    found_middle_high = []    if threshold_mode == THRESHOLD_FIXED:        threshold = calibration['threshold']    elif threshold_mode == THRESHOLD_AVERAGE:        values = [x['confidence'] for x in results]        threshold = math.ceil(float(sum(values)) / float(len(values)))    if segments_needed < 1:        segments_needed = 1    for index, result in enumerate(results):        #if result['confidence'] >= high:        #    if found_high is None:        #        found_high = index        #    elif result['confidence'] > results[found_high]['confidence']:        #        found_high = index        #elif result['confidence'] >= middle_high:        #    found_middle_high.append(index)        if not expect_space:            if result['confidence'] >= threshold:                found_counter += 1                last_value_in_threshold_index = index                if found_index is None:                    found_index = index                if expect_recover:                    found_counter += found_down_counter                    expect_recover = False            elif fall_tolerance:                if not expect_recover:                    if last_value_in_threshold_index != -1:                        """ Solo cuando ya haya entrado por lo menos                        un valor en el rango del threshold, es cuando                        se podrá esperar un valor bajo """                        expect_recover = True                        found_down_counter += 1                    else:                        pass                else:                    """ Si después de haber pasado tolerado 1 elemento                    vuelve a salir otro fuera del threshold continuo,                    entonces ya se da por perdido """                    found_counter = 0                    found_down_counter = 0                    found_index = None                    expect_recover = False            else:                found_counter = 0                found_down_counter = 0                found_index = None                expect_recover = False                # Aquí veremos si hay un valor alto                #if found_high is not None:                #    found_row = results[found_high]                #    found.append(found_row)                #elif len(found_middle_high) >= segment_middle_needed:                #    found_row = results[found_middle_high[0]]                #    found.append(found_row)                #found_high = None                #found_middle_high = []        else:            if result['confidence'] <= threshold:                expect_space = False        if found_counter >= segments_needed:            found_row = results[found_index]            found.append(found_row)            found_counter = 0            expect_space = True            #found_high = None            #found_middle_high = []    return founddef iterate_audios(dt, station, calibration=None):    """ Given a datetime object and an station,    iterate a list of files that are between    the the date and itself plus 5 minutes;    station must match too """    tm = time.mktime(dt.timetuple())    if calibration and calibration['hourlyOffset']:        hoffset = calibration['hourlyOffset']        from_time = tm + hoffset        to_time = tm + 3599 + hoffset    elif AHEAD_TIME_AUDIO_TOLERANCE:        """ Conventional mode """        from_time = tm + AHEAD_TIME_AUDIO_TOLERANCE        to_time = from_time + 3599 + AHEAD_TIME_AUDIO_TOLERANCE    log.info('from {} to {}'.format(int(from_time), int(to_time)))    cursor = db.cursor()    cursor.execute((        'select "filename", "timestamp" '        'from "file" '        'where "timestamp" between ? and ? '        'and "station" = ? '        'order by "timestamp" asc'    ),        (from_time, to_time, station,),    )    files = [file for file in cursor]    cursor.close()    for mp3 in files:        mp3path, ts = mp3        mp3name = os.path.basename(mp3path)        yield (mp3path, mp3name, ts)def cloud_download(ad_key=None):    """ Given an ad key, the file is downloaded to    the system temporal folder to be processed """    if ad_key in cloud_cache:        """ If this file has already been downloaded,        will not be downloaded again, instead will        be taken from cloud_cache dictionary """        filename, md5hash = cloud_cache[ad_key]        if os.path.isfile(filename):            return filename, md5hash    ad = fbdb.reference('ads/{}'.format(ad_key)).get()    filename = os.path.basename(ad['path'])    out_file = os.path.join(AUDIOS_PATH, filename)    url = '{}/{}'.format(cloud_base_url, ad['path'])    if(os.path.isfile(out_file)):        return out_file, md5_checksum(out_file)    response = requests.get(url)    if response.status_code == 200:        hashes = response.headers['x-goog-hash']        hashes = hashes.split(',')        hashes = [h.split('=', 1) for h in hashes]        hashes = {h[0].strip(): hexlify(b64decode(h[1])) for h in hashes}        md5sum = hashes['md5']        with open(out_file, "wb") as fp:            fp.write(response.content)            tp = (out_file, md5sum,)            p = Popen(['ffprobe', '-v', 'error', '-select_streams', 'a:0', '-show_entries', 'stream=codec_name', '-of',                       'default=nokey=1:noprint_wrappers=1', out_file], stdin=PIPE, stdout=PIPE, stderr=PIPE)            rc = p.returncode            if rc != 'mp3\n':                subprocess.call(['mv', out_file, out_file + '.old'])                subprocess.call(                    ['ffmpeg', '-hide_banner', '-loglevel', 'panic', '-i', out_file + '.old', '-codec:a', 'libmp3lame',                     '-qscale:a', '2', '-f', 'mp3', out_file])                subprocess.call(['rm', '-rf', out_file + '.old'])            cloud_cache[ad_key] = tp            return tpdef download_file(file_path=None):    file_path_cloud = file_path.replace("/var/fourier/", "")    url = '{}/{}'.format(cloud_base_url, file_path_cloud)    response = requests.get(url)    if response.status_code == 200:        with open(file_path, "wb") as fp:            fp.write(response.content)        cursor = db.cursor()        cursor.execute('update "file" set uploaded = 0 where filename = ?', (file_path,), )        cursor.close()def get_pendings():    url = 'https://api.fourier.audio/v1/calendario/pendiente?id={}'.format(config['device_id'], )    headers = {        'Authorization': 'Bearer {}'.format(config['apiSecret'], )    }    response = requests.get(url, headers=headers)    return response.json()def send_results(item):    url = 'https://api.fourier.audio/v1/calendario/resultado'    headers = {        'Authorization': 'Bearer {}'.format(config['apiSecret'], )    }    log.info('url: {}'.format(url))    response = requests.post(url, json=item, headers=headers)    return responsedef md5_checksum(filename):    hash_md5 = hashlib.md5()    with open(filename, "rb") as f:        for chunk in iter(lambda: f.read(4096), b""):            hash_md5.update(chunk)    return hash_md5.hexdigest()app = setup_endpoint(queue=queue)loop = IOLoop.current()loop.add_callback(feed_queue)if __name__ == '__main__':    try:        log.info('Starting ondemand service')        loop.start()    except KeyboardInterrupt:        log.error('Process killed')
 |