# -*- coding: utf8 -*- from __future__ import print_function from tornado.ioloop import IOLoop from tornado.web import Application from fourier.api.client import Client, ConnectionError from fourier.boxconfig import parse_config from fourier.dejavu.recognize import FilePerSecondRecognizer from datetime import datetime, timedelta from fourier.dejavu import Dejavu from Queue import Queue, Empty from firebase_admin import credentials from firebase_admin import db as fbdb import firebase_admin import requests import dateutil import sqlite3 import time import sys import os AUDIOS_PATH = '/tmp' config = parse_config() queue = Queue() client = Client(config['device_id'], config['apiSecret']) cloud_base_url = 'https://storage.googleapis.com/{}'\ .format(config['bucket']) base_path = config.get("basepath", "/var/fourier") fb_credentials = credentials.Certificate('/etc/Fourier-key.json') firebase_admin.initialize_app(fb_credentials, config['firebase']) dejavu = Dejavu({"database_type":"mem"}) device_id = config['device_id'] device_path = os.path.join(base_path, device_id) recognizer = FilePerSecondRecognizer db_path = os.path.join(device_path, 'files.db') db = sqlite3.connect(db_path) def feed_queue(): """ Search for pending scheduled work in server and add them to a memory queue. """ try: response = client.get_schedule_pending() for item in response['items']: queue.put(item) if queue.qsize() > 0: loop.add_callback(process_queue) else: loop.add_timeout(time.time() + 30, feed_queue) except ConnectionError as ex: print('cannot feed: {}, retryig later'.format(ex), file=sys.stderr) loop.add_timeout(time.time() + 15, feed_queue) except Exception as ex: """ Errores desconocidos """ print(ex, file=sys.stderr) loop.add_timeout(time.time() + 60, feed_queue) def process_queue(): """ Try to the next item in a queue and start processing it accordingly. If success, repeat the function or go to feed if no more items. """ try: item = queue.get(False) process_segment(item) loop.add_callback(process_queue) except Empty: loop.add_callback(feed_queue) except Exception as ex: print(str(ex), file=sys.stderr) loop.add_callback(process_queue) def process_segment(item): """ Procesa una hora de audio """ station = item['station'] date = dateutil.parser.parse(item['date']) # 1. obtener el audio desde firebase # y calcular su fingerprint. filename = cloud_download(ad_key=item['ad']) if not filename: print('ad file missing', file=sys.stderr) return try: dejavu.fingerprint_file(filename) except Exception as ex: print('cannot fingerprint: {}'.format(ex)) # 2. Read the list of files from local database results = [] for path, name, ts in iterate_audios(date, station): print(path) for match in dejavu.recognize(recognizer, path, 5): try: results.append({ 'confidence': match['confidence'], 'timestamp': ts }) print("{} {}".format(ts, match['confidence'])) except KeyError as ex: print(ex, file=sys.stderr) ts += match['length'] / 1000 # 3. Look for success or failure ts = 0 found_counter = 0 found_index = None segments_needed = 2 seconds_needed = 9 threshold = 20 found = None for index, result in enumerate(results): if result['confidence'] > threshold: found_counter += 1 if found_index is None: found_index = index else: found_counter = 0 found_index = None if found_counter >= segments_needed: """ TODO: It will be neccessary to improve this further, so it can recognize more than one audio in the same segment of 1 hour. Also the seconds transcurred is important; a time difference is needed """ found = results[found_index] break try: response = client.put_schedule_results( item['schedule'], item['id'], results, found=found['timestamp'] if found else None, ) print(len(results)) print(results) #print(response) except ConnectionError as ex: print(ex, file=sys.stderr) except UserWarning as warn: print(str(warn), file=sys.stderr) def iterate_audios(dt, station): """ Given a datetime object and an station, iterate a list of files that are between the the date and itself plus 5 minutes; station must match too """ from_time = time.mktime(dt.timetuple()) to_time = from_time + 3599 print('from {} to {}'.format(from_time, to_time)) cursor = db.cursor() cursor.execute(( 'select "filename", "timestamp" ' 'from "file" ' 'where "timestamp" between ? and ? ' 'and "station" = ? ' 'order by "timestamp" asc' ), (from_time, to_time, station, ), ) files = [file for file in cursor] cursor.close() for mp3 in files: mp3path, ts = mp3 mp3name = os.path.basename(mp3path) yield (mp3path, mp3name, ts) def cloud_download(ad_key=None): """ Given an ad key, the file is downloaded to the system temporal folder to be processed """ ad = fbdb.reference('ads/{}'.format(ad_key)).get() filename = os.path.basename(ad['path']) out_file = os.path.join(AUDIOS_PATH, filename) url = '{}/{}'.format(cloud_base_url, ad['path']) response = requests.get(url) if response.status_code == 200: with open(out_file, "wb") as fp: fp.write(response.content) return out_file app = Application() loop = IOLoop.current() loop.add_callback(feed_queue) if __name__ == '__main__': loop.start()