# -*- coding: utf8 -*- from __future__ import print_function from tornado.ioloop import IOLoop from tornado.web import Application from fourier.api.client import Client, ConnectionError from fourier.boxconfig import parse_config from fourier.dejavu.recognize import FilePerSecondRecognizer from datetime import datetime, timedelta from fourier.dejavu import Dejavu from firebase import firebase from Queue import Queue, Empty from firebase_admin import credentials from firebase_admin import db as fbdb import firebase_admin import requests import dateutil import sqlite3 import time import sys import os AUDIOS_PATH = '/tmp' config = parse_config() queue = Queue() client = Client(config['device_id'], #config['apiSecret'], '7002a5a7592f624aea1364d64a11e39f', base_url='http://localhost:41222') cloud_base_url = 'https://storage.googleapis.com/{}'\ .format(config['bucket']) base_path = config.get("basepath", "/var/fourier") fb_credentials = credentials.Certificate('/etc/Fourier-key.json') firebase_admin.initialize_app(fb_credentials, config['firebase']) dejavu = Dejavu({"database_type":"mem"}) device_id = config['device_id'] device_path = os.path.join(base_path, device_id) recognizer = FilePerSecondRecognizer db_path = os.path.join(device_path, 'files.db') db = sqlite3.connect(db_path) def feed_queue(): try: response = client.get_schedule_pending() for item in response['items']: queue.put(item) if queue.qsize() > 0: loop.add_callback(process_queue) else: loop.add_timeout(time.time() + 30, feed_queue) except ConnectionError as ex: print('cannot feed: {}, retryig later'.format(ex), file=sys.stderr) loop.add_timeout(time.time() + 15, feed_queue) except Exception as ex: """ Errores desconocidos """ print(ex, file=sys.stderr) loop.add_timeout(time.time() + 60, feed_queue) def process_queue(): try: item = queue.get(False) process_segment(item) loop.add_callback(process_queue) except Empty: loop.add_callback(feed_queue) def process_segment(item): """ Procesa una hora de audio """ station = item['station'] date = dateutil.parser.parse(item['date']) # 1. obtener el audio desde firebase # y calcular su fingerprint. filename = cloud_download(ad_key=item['ad']) if not filename: print('ad file missing', file=sys.stderr) return dejavu.fingerprint_file(filename) # 2. leer la lista de archivos de la base # de datos local de la caja. for path, name, ts in iterate_audios(date, station): matches = [] for match in dejavu.recognize(recognizer, path, 5): matches.append({ 'confidence': match['confidence'], 'timestamp': ts }) print("{} {} {}".format( match['confidence'], ts, match['name'], )) ts += float(match["length"]) / 1000.0 try: client.put_schedule_results( item['schedule'], item['id'], matches ) except ConnectionError as ex: print(ex, file=sys.stderr) def iterate_audios(dt, station): from_time = time.mktime(dt.timetuple()) to_time = from_time + 5 * 60 cursor = db.cursor() cursor.execute(( 'select "filename" ' 'from "file" ' 'where "timestamp" between ? and ? ' 'and "station" = ? ' 'order by "timestamp" asc' ), (from_time, to_time, station, ), ) files = [] for file in cursor: files.append(file[0]) for mp3path in files: mp3name = os.path.basename(mp3path) mp3date = parse_date_filename(mp3_name) if mp3date: mp3ts = int(mp3date.strftime("%s")) yield (mp3path, mp3name, mp3ts) def cloud_download(ad_key=None): ad = fbdb.reference('ads/{}'.format(ad_key)).get() filename = os.path.basename(ad['path']) out_file = os.path.join(AUDIOS_PATH, filename) url = '{}/{}'.format(cloud_base_url, ad['path']) response = requests.get(url) if response.status_code == 200: with open(out_file, "wb") as fp: fp.write(response.content) return out_file else: print('no') return None def parse_date_filename(val): try: return strptime(val[:19], '%Y-%m-%dT%H-%M-%S') except ValueError: return strptime(val[:19], '%Y-%m-%d-%H-%M-%S') app = Application() loop = IOLoop.current() loop.add_callback(feed_queue) if __name__ == '__main__': loop.start()