# -*- coding: utf8 -*- from __future__ import print_function from tornado.ioloop import IOLoop from tornado.web import Application from fourier.api.client import Client, ConnectionError from fourier.boxconfig import parse_config from fourier.dejavu.recognize import FilePerSecondRecognizer from datetime import datetime, timedelta from ondemand.endpoint import setup_endpoint from fourier.dejavu import Dejavu from Queue import Queue, Empty from firebase_admin import credentials from firebase_admin import db as fbdb from binascii import hexlify from base64 import b64decode import firebase_admin import requests import dateutil import sqlite3 import time import sys import os AUDIOS_PATH = '/tmp' AHEAD_TIME_AUDIO_TOLERANCE = 2 # second config = parse_config() queue = Queue() client = Client(config['device_id'], config['apiSecret']) cloud_base_url = 'https://storage.googleapis.com/{}'\ .format(config['bucket']) base_path = config.get("basepath", "/var/fourier") fb_credentials = credentials.Certificate('/etc/Fourier-key.json') firebase_admin.initialize_app(fb_credentials, config['firebase']) dejavu = Dejavu({"database_type":"mem"}) device_id = config['device_id'] device_path = os.path.join(base_path, device_id) recognizer = FilePerSecondRecognizer db_path = os.path.join(device_path, 'files.db') db = sqlite3.connect(db_path) cloud_cache = {} def feed_queue(): """ Search for pending scheduled work in server and add them to a memory queue. """ try: response = client.get_schedule_pending() for item in response['items']: queue.put(item) if queue.qsize() > 0: loop.add_callback(process_queue) else: loop.add_timeout(time.time() + 30, feed_queue) except ConnectionError as ex: print('cannot feed: {}, retryig later'.format(ex), file=sys.stderr) loop.add_timeout(time.time() + 15, feed_queue) except Exception as ex: """ Errores desconocidos """ print(ex, file=sys.stderr) loop.add_timeout(time.time() + 60, feed_queue) def process_queue(): """ Try to the next item in a queue and start processing it accordingly. If success, repeat the function or go to feed if no more items. """ try: item = queue.get(False) process_segment(item) loop.add_callback(process_queue) except Empty: loop.add_callback(feed_queue) except Exception as ex: print(str(ex), file=sys.stderr) loop.add_callback(process_queue) def process_segment(item): """ Procesa una hora de audio """ station = item['station'] date = dateutil.parser.parse(item['date']) # 1. obtener el audio desde firebase # y calcular su fingerprint. filename, md5hash = cloud_download(ad_key=item['ad']) if not filename: print('ad file missing', file=sys.stderr) return try: dejavu.fingerprint_file(filename) except Exception as ex: print('cannot fingerprint: {}'.format(ex)) # 2. Read the list of files from local database results = [] for path, name, ts in iterate_audios(date, station): print(path) for match in dejavu.recognize(recognizer, path, 5, ads_filter=[md5hash]): try: results.append({ 'confidence': match['confidence'], 'timestamp': ts, 'offset': match['offset'] }) print("{} {}".format(ts, match['confidence'])) except KeyError as ex: print(ex, file=sys.stderr) ts += match['length'] / 1000 try: response = client.put_schedule_results( item['schedule'], item['id'], None, # TODO: send results again found=find_repetitions(results), ) print("api response: {}".format(response)) except ConnectionError as ex: print(ex, file=sys.stderr) except UserWarning as warn: print(str(warn), file=sys.stderr) def find_repetitions(results): found_counter = 0 found_index = None segments_needed = 2 seconds_needed = 9 threshold = 20 expect_space = False found = [] for index, result in enumerate(results): if not expect_space: if result['confidence'] > threshold: found_counter += 1 if found_index is None: found_index = index else: found_counter = 0 found_index = None else: if result['confidence'] <= threshold: expect_space = False if found_counter >= segments_needed: found.append(results[found_index]['timestamp']) found_counter = 0 expect_space = True return found def iterate_audios(dt, station): """ Given a datetime object and an station, iterate a list of files that are between the the date and itself plus 5 minutes; station must match too """ from_time = time.mktime(dt.timetuple()) \ - AHEAD_TIME_AUDIO_TOLERANCE to_time = from_time + 3599 + AHEAD_TIME_AUDIO_TOLERANCE print('from {} to {}'.format(from_time, to_time)) cursor = db.cursor() cursor.execute(( 'select "filename", "timestamp" ' 'from "file" ' 'where "timestamp" between ? and ? ' 'and "station" = ? ' 'order by "timestamp" asc' ), (from_time, to_time, station, ), ) files = [file for file in cursor] cursor.close() for mp3 in files: mp3path, ts = mp3 mp3name = os.path.basename(mp3path) yield (mp3path, mp3name, ts) def cloud_download(ad_key=None): """ Given an ad key, the file is downloaded to the system temporal folder to be processed """ if ad_key in cloud_cache: """ If this file has already been downloaded, will not be downloaded again, instead will be taken from cloud_cache dictionary """ filename, md5hash = cloud_cache[ad_key] if os.path.isfile(filename): return filename, md5hash ad = fbdb.reference('ads/{}'.format(ad_key)).get() filename = os.path.basename(ad['path']) out_file = os.path.join(AUDIOS_PATH, filename) url = '{}/{}'.format(cloud_base_url, ad['path']) response = requests.get(url) if response.status_code == 200: hashes = response.headers['x-goog-hash'] hashes = hashes.split(',') hashes = [h.split('=', 1) for h in hashes] hashes = {h[0].strip(): hexlify(b64decode(h[1])) for h in hashes} md5sum = hashes['md5'] with open(out_file, "wb") as fp: fp.write(response.content) tp = (out_file, md5sum) cloud_cache[ad_key] = tp return tp app = setup_endpoint(queue=queue) loop = IOLoop.current() loop.add_callback(feed_queue) if __name__ == '__main__': try: loop.start() except KeyboardInterrupt: print('process killed')