Gamaliel Espinoza Macedo 6 anni fa
parent
commit
44b13bd44e
3 ha cambiato i file con 91 aggiunte e 23 eliminazioni
  1. 56 0
      ondemand/calibration.py
  2. 33 22
      ondemand/service.py
  3. 2 1
      requirements.txt

+ 56 - 0
ondemand/calibration.py

@@ -0,0 +1,56 @@
+from firebase_admin import db as fbdb
+from threading import Thread
+from tornado.ioloop import IOLoop
+import logging as log
+
+
+class Calibrations(object):
+    DEFAULT_TOLERANCE_RATE = 0.6
+    DEFAULT_THRESHOLD = 10
+
+    def __init__(self, device_id):
+        self.dev_ref = fbdb.reference('devices').child(device_id)
+        self.calibrations_ref = self.dev_ref.child('calibrations')
+        self.thread = None
+        self.listener = None
+        self.items = {}
+
+    def get(self, station):
+        calref = self.calibrations_ref.child(station)
+
+        try:
+            remote = calref.get()
+        except fbdb.ApiCallError as err:
+            log.error(err)
+            remote = None
+
+        local  = self.items.get(station, None)
+
+        if not remote and not local:
+            calibration = {
+                'tolerance': self.DEFAULT_TOLERANCE_RATE,
+                'threshold': self.DEFAULT_THRESHOLD,
+            }
+
+        elif remote and not local:
+            calibration = {
+                'tolerance': remote.get(
+                    'tolerance',
+                    self.DEFAULT_TOLERANCE_RATE,
+                ),
+                'threshold': remote.get(
+                    'threshold',
+                    self.DEFAULT_THRESHOLD,
+                ),
+            }
+
+        elif not remote and local:
+            calibration = {
+                'tolerance': self.DEFAULT_TOLERANCE_RATE,
+                'threshold': self.DEFAULT_THRESHOLD
+            }
+
+        self.items[station] = calibration
+
+        return self.items[station]
+

+ 33 - 22
ondemand/service.py

@@ -7,8 +7,8 @@ from fourier.boxconfig import parse_config
 from fourier.dejavu.recognize import FilePerSecondRecognizer
 from datetime import datetime, timedelta
 from ondemand.endpoint import setup_endpoint
+from ondemand.calibration import Calibrations
 from fourier.dejavu import Dejavu
-from Queue import Queue, Empty
 from firebase_admin import credentials
 from firebase_admin import db as fbdb
 from binascii import hexlify
@@ -27,7 +27,12 @@ import time
 import sys
 import os
 
-log.basicConfig(format='[%(asctime)s] %(message)s', level=log.INFO)
+if sys.version_info >= (3, 0):
+    from queue import Queue, Empty
+else:
+    from Queue import Queue, Empty
+
+log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
 
 AUDIOS_PATH = '/tmp'
 AHEAD_TIME_AUDIO_TOLERANCE = 2 # second
@@ -66,9 +71,7 @@ dejavu = Dejavu({"database_type":"mem"})
 device_id = config['device_id']
 device_path = os.path.join(base_path, device_id)
 recognizer = FilePerSecondRecognizer
-"""calibrations = {}
-device_ref = fbdb.reference('devices').child(config['device_id'])
-calibrations_ref = device_ref.child('calibrations')"""
+calibrations = Calibrations(config['device_id'])
 
 # settings
 queue_mode = QUEUE_THREAD
@@ -141,7 +144,8 @@ def process_queue_with_threads():
                     thread = MultiAPI(target=process_segment,
                         args=(item,),
                         kwargs={
-                            'audios': [f for f in iterate_audios(date, station)]
+                            'audios': [f for f in iterate_audios(date, station)],
+                            'calibration': calibrations.get(station),
                         }
                     )
                     threads[index] = thread
@@ -159,23 +163,23 @@ def process_queue_with_threads():
     log.info('Finished thread processing')
     loop.add_callback(feed_queue)
 
-def process_segment(item, audios=None):
+def process_segment(item, audios=None, calibration=None):
     """ Procesa una hora de audio """
 
     station = item['station']
     date = dateutil.parser.parse(item['date'])
 
-    log.info('processing segment: {}'.format(item))
-
-    """# 0. Obtener la información de calibración
-    calibration = calibrations_ref.child(station)
-    print(calibration)"""
+    log.info('[process_segment] (th: {}, tl: {}) {}'.format(
+        calibration['threshold'],
+        calibration['tolerance'],
+        item
+    ))
 
     # 1. obtener el audio desde firebase
     #    y calcular su fingerprint.
     filename, md5hash = cloud_download(ad_key=item['ad'])
     if not filename:
-        log.info('ad file missing')
+        log.info('[process_segment] ad file missing')
         return
 
     # 1.1 Calcular el número de segmentos requeridos
@@ -187,14 +191,14 @@ def process_segment(item, audios=None):
             segments_needed * SEGMENTS_TOLERANCE_RATE
         ))
     except Exception as ex:
-        log.error('file {} is not an mp3'.format(filename))
+        log.error('[process_segment] file {} is not an mp3'.format(filename))
         log.error(str(ex))
         return
 
     try:
         dejavu.fingerprint_file(filename)
     except Exception as ex:
-        log.error('cannot fingerprint: {}'.format(ex))
+        log.error('[process_segment] cannot fingerprint: {}'.format(ex))
 
     """ Hay dos posibles escensarios al obtener los audios
       a. Los audios vienen por el parámetro "audios" de la
@@ -209,9 +213,15 @@ def process_segment(item, audios=None):
     audios_counter = 0
     results = []
     for path, name, ts in audios_iterable:
-        log.info('file: {}'.format(path))
+        short_path = os.path.join(station, name)
         audios_counter += os.path.isfile(path)
         values = []
+
+        if not os.path.isfile(path):
+            log.error('[process_segment] file not found: {}'\
+                .format(short_path))
+            continue
+
         for match in dejavu.recognize(recognizer, path, 5,
                                       ads_filter=[md5hash]):
             try:
@@ -223,15 +233,16 @@ def process_segment(item, audios=None):
                 values.append(str(match['confidence']))
 
             except KeyError as ex:
-                the_key = str(ex)
-                if the_key == 'confidence':
-                    log.warning('Invalid confidence')
+                # TODO: eliminar esta parte, ya no será necesario
+                if 'confidence' in str(ex):
+                    log.error('Invalid confidence')
+                    log.error(match)
                 else:
-                    log.warning(the_key)
+                    log.error(str(ex))
 
             ts += match['length'] / 1000
 
-        log.info('{2} ({3}/{0}) {1}'.format(
+        log.info('[process_segment] [{3}] {2} {0}) {1}'.format(
             os.path.split(path)[-1],
             ','.join(values),
             item['ad'],
@@ -249,7 +260,7 @@ def process_segment(item, audios=None):
             missing_files=(12 - audios_counter) \
                           if audios_counter < 12 else 0
         )
-        log.info('API response: {}'.format(response))
+        log.info('[{}] API response: {}'.format(station, response))
     except ConnectionError as ex:
         log.error(str(ex))
     except UserWarning as warn:

+ 2 - 1
requirements.txt

@@ -1,4 +1,5 @@
 tornado
 firebase_admin
 python-firebase
-requests
+requests
+mutagen