Gamaliel Espinoza vor 7 Jahren
Ursprung
Commit
5ac8227262
3 geänderte Dateien mit 167 neuen und 0 gelöschten Zeilen
  1. 0 0
      ondemand/__init__.py
  2. 4 0
      ondemand/__main__.py
  3. 163 0
      ondemand/service.py

+ 0 - 0
ondemand/__init__.py


+ 4 - 0
ondemand/__main__.py

@@ -0,0 +1,4 @@
+from __future__ import absolute_import
+from ondemand.service import loop
+
+loop.start()

+ 163 - 0
ondemand/service.py

@@ -0,0 +1,163 @@
+# -*- coding: utf8 -*-
+from __future__ import print_function
+from tornado.ioloop import IOLoop
+from tornado.web import Application
+from fourier.api.client import Client, ConnectionError
+from fourier.boxconfig import parse_config
+from fourier.dejavu.recognize import FilePerSecondRecognizer
+from datetime import datetime, timedelta
+from fourier.dejavu import Dejavu
+from firebase import firebase
+from Queue import Queue, Empty
+from firebase_admin import credentials
+from firebase_admin import db as fbdb
+import firebase_admin
+import requests
+import dateutil
+import sqlite3
+import time
+import sys
+import os
+
+AUDIOS_PATH = '/tmp'
+
+config = parse_config()
+queue = Queue()
+client = Client(config['device_id'],
+                #config['apiSecret'],
+                '7002a5a7592f624aea1364d64a11e39f',
+                base_url='http://localhost:41222')
+cloud_base_url = 'https://storage.googleapis.com/{}'\
+                .format(config['bucket'])
+base_path = config.get("basepath", "/var/fourier")
+fb_credentials = credentials.Certificate('/etc/Fourier-key.json')
+firebase_admin.initialize_app(fb_credentials, config['firebase'])
+dejavu = Dejavu({"database_type":"mem"})
+device_id = config['device_id']
+device_path = os.path.join(base_path, device_id)
+recognizer = FilePerSecondRecognizer
+
+db_path = os.path.join(device_path, 'files.db')
+db = sqlite3.connect(db_path)
+
+def feed_queue():
+    try:
+        response = client.get_schedule_pending()
+        for item in response['items']:
+            queue.put(item)
+
+        if queue.qsize() > 0:
+            loop.add_callback(process_queue)
+        else:
+            loop.add_timeout(time.time() + 30, feed_queue)
+
+    except ConnectionError as ex:
+        print('cannot feed: {}, retryig later'.format(ex),
+              file=sys.stderr)
+        loop.add_timeout(time.time() + 15, feed_queue)
+
+    except Exception as ex:
+        """ Errores desconocidos """
+        print(ex, file=sys.stderr)
+        loop.add_timeout(time.time() + 60, feed_queue)
+
+def process_queue():
+    try:
+        item = queue.get(False)
+        process_segment(item)
+        loop.add_callback(process_queue)
+    except Empty:
+        loop.add_callback(feed_queue)
+
+def process_segment(item):
+    """ Procesa una hora de audio """
+
+    station = item['station']
+    date = dateutil.parser.parse(item['date'])
+
+    # 1. obtener el audio desde firebase
+    #    y calcular su fingerprint.
+    filename = cloud_download(ad_key=item['ad'])
+    if not filename:
+        print('ad file missing', file=sys.stderr)
+        return
+    
+    dejavu.fingerprint_file(filename)
+
+    # 2. leer la lista de archivos de la base
+    #    de datos local de la caja.
+    for path, name, ts in iterate_audios(date, station):
+        matches = []
+        for match in dejavu.recognize(recognizer, path, 5):
+            matches.append({
+                'confidence': match['confidence'],
+                'timestamp': ts
+            })
+            print("{} {} {}".format(
+                match['confidence'],
+                ts,
+                match['name'],
+            ))
+            ts += float(match["length"]) / 1000.0
+
+        try:
+            client.put_schedule_results(
+                item['schedule'],
+                item['id'],
+                matches
+            )
+        except ConnectionError as ex:
+            print(ex, file=sys.stderr)
+
+def iterate_audios(dt, station):
+    from_time = time.mktime(dt.timetuple())
+    to_time = from_time + 5 * 60
+
+    cursor = db.cursor()
+    cursor.execute((
+        'select "filename" '
+        'from "file" '
+        'where "timestamp" between ? and ? '
+            'and "station" = ? '
+        'order by "timestamp" asc'
+        ),   
+        (from_time, to_time, station, ),
+    )
+    files = []
+    for file in cursor:
+        files.append(file[0])
+
+    for mp3path in files:
+        mp3name = os.path.basename(mp3path)
+        mp3date = parse_date_filename(mp3_name)
+        if mp3date:
+            mp3ts = int(mp3date.strftime("%s"))
+            yield (mp3path, mp3name, mp3ts)
+
+def cloud_download(ad_key=None):
+    ad = fbdb.reference('ads/{}'.format(ad_key)).get()
+    filename = os.path.basename(ad['path'])
+    out_file = os.path.join(AUDIOS_PATH, filename)
+    url = '{}/{}'.format(cloud_base_url, ad['path'])
+    response = requests.get(url)
+    
+    if response.status_code == 200:
+        with open(out_file, "wb") as fp:
+            fp.write(response.content)
+            return out_file
+    else:
+        print('no')
+        return None
+
+def parse_date_filename(val):
+    try:
+        return strptime(val[:19], '%Y-%m-%dT%H-%M-%S')
+    except ValueError:
+        return strptime(val[:19], '%Y-%m-%d-%H-%M-%S')
+
+app = Application()
+loop = IOLoop.current()
+loop.add_callback(feed_queue)
+
+if __name__ == '__main__':
+    loop.start()