ソースを参照

Conflicto resuelto

Gamaliel Espinoza 6 年 前
コミット
e5991a6a68
共有6 個のファイルを変更した224 個の追加37 個の削除を含む
  1. 4 0
      ondemand/__init__.py
  2. 87 0
      ondemand/calibration.py
  3. 94 34
      ondemand/service.py
  4. 2 1
      requirements.txt
  5. 4 1
      setup.py
  6. 33 1
      tests/test_general.py

+ 4 - 0
ondemand/__init__.py

@@ -1 +1,5 @@
+<<<<<<< HEAD
 __version__ = '1.3.2-rc1'
+=======
+__version__ = '1.3.2-rc3'
+>>>>>>> 40e6041a5b04c058c2b0b7c83c8daccce8c1d6a6

+ 87 - 0
ondemand/calibration.py

@@ -0,0 +1,87 @@
+from firebase_admin import db as fbdb
+from threading import Thread
+from tornado.ioloop import IOLoop
+import logging as log
+
+
+class Calibrations(object):
+    TOLERANCE_RATE = 0.6
+    THRESHOLD = 10
+    FALL_TOLERANCE = 0
+    SEGMENT_SIZE = 5
+    HOURLY_OFFSET = -5
+
+    # SEGMENT_SIZE
+    # n = cualquier cantidad de segundos
+    # ceil = la duracion del audio en segmentos de 5
+    # integer = la duracion exacto del audio 
+
+    def __init__(self, device_id, client=None):
+        self.client = client
+        self.dev_ref = fbdb.reference('devices').child(device_id)
+        self.calibrations_ref = self.dev_ref.child('calibrations')
+        self.thread = None
+        self.listener = None
+        self.items = {}
+
+    def get(self, station):
+        remote = self.get_remote(station)
+        local  = self.items.get(station, None)
+
+        if not remote and not local:
+            calibration = {
+                'tolerance': self.TOLERANCE_RATE,
+                'threshold': self.THRESHOLD,
+                'fallTolerance': self.FALL_TOLERANCE,
+                'segmentSize': self.SEGMENT_SIZE,
+                'hourlyOffset': self.HOURLY_OFFSET,
+            }
+
+        elif not remote and local:
+            calibration = {
+                'tolerance': self.TOLERANCE_RATE,
+                'threshold': self.THRESHOLD,
+                'fallTolerance': self.FALL_TOLERANCE,
+                'segmentSize': self.SEGMENT_SIZE,
+                'hourlyOffset': self.HOURLY_OFFSET,
+            }
+
+        else:
+            calibration = {
+                'tolerance': remote.get(
+                    'tolerance',
+                    self.TOLERANCE_RATE,
+                ),
+                'threshold': remote.get(
+                    'threshold',
+                    self.THRESHOLD,
+                ),
+                'fallTolerance': remote.get(
+                    'fallTolerance',
+                    self.FALL_TOLERANCE,
+                ),
+                'segmentSize': remote.get(
+                    'segmentSize',
+                    self.SEGMENT_SIZE,
+                ),
+                'hourlyOffset': remote.get(
+                    'hourlyOffset',
+                    self.HOURLY_OFFSET,
+                )
+            }
+
+        self.items[station] = calibration
+
+        return self.items[station]
+
+    def get_remote(self, station):
+        try:
+            if self.client:
+                return self.client.get_calibrations(station=station)
+            else:
+                calref = self.calibrations_ref.child(station)
+                return calref.get()
+
+        except Exception as err:
+            log.error('[Calibrations.get_remote] {}'.format(err))
+            return None

+ 94 - 34
ondemand/service.py

@@ -7,6 +7,7 @@ from fourier.boxconfig import parse_config
 from fourier.dejavu.recognize import FilePerSecondRecognizer
 from datetime import datetime, timedelta
 from ondemand.endpoint import setup_endpoint
+from ondemand.calibration import Calibrations
 from fourier.dejavu import Dejavu
 from firebase_admin import credentials
 from firebase_admin import db as fbdb
@@ -18,6 +19,7 @@ from argparse import ArgumentParser
 import logging as log
 import firebase_admin
 import mutagen.mp3
+import OpenSSL.SSL
 import requests
 import dateutil
 import sqlite3
@@ -31,7 +33,7 @@ if sys.version_info >= (3, 0):
 else:
     from Queue import Queue, Empty
 
-log.basicConfig(format='[%(asctime)s] %(message)s', level=log.INFO)
+log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
 
 AUDIOS_PATH = '/tmp'
 AHEAD_TIME_AUDIO_TOLERANCE = 2 # second
@@ -70,9 +72,8 @@ dejavu = Dejavu({"database_type":"mem"})
 device_id = config['device_id']
 device_path = os.path.join(base_path, device_id)
 recognizer = FilePerSecondRecognizer
-calibrations = {}
 device_ref = fbdb.reference('devices').child(config['device_id'])
-calibrations_ref = device_ref.child('calibrations')
+calibrations = Calibrations(config['device_id'], client=client)
 
 # settings
 queue_mode = QUEUE_THREAD
@@ -113,6 +114,7 @@ def feed_queue():
         """ Errores desconocidos """
         log.error('[feed_queue] {}'.format(ex))
         loop.add_timeout(time.time() + 60, feed_queue)
+        raise ex
 
 def process_queue():
     """ Try to the next item in a queue and start
@@ -141,18 +143,33 @@ def process_queue_with_threads():
                     item = queue.get(False)
                     station = item['station']
                     date = dateutil.parser.parse(item['date'])
+                    calibration = calibrations.get(station)
+
+                    audios = [f for f in iterate_audios(
+                        date, station,
+                        calibration=calibration
+                    )]
 
                     thread = MultiAPI(target=process_segment,
                         args=(item,),
                         kwargs={
-                            'audios': [f for f in iterate_audios(date, station)]
+                            'audios': audios,
+                            'calibration': calibration,
                         }
                     )
                     threads[index] = thread
                     thread.start()
+
                 except Empty:
                     is_drained = True
 
+                except Exception as err:
+                    log.error('[process_queue_with_threads] [{}] {}'.format(
+                        station,
+                        err,
+                    ))
+                    continue
+
             elif not t.is_alive():
                 threads[index] = None
 
@@ -163,42 +180,64 @@ def process_queue_with_threads():
     log.info('Finished thread processing')
     loop.add_callback(feed_queue)
 
-def process_segment(item, audios=None):
+def process_segment(item, audios=None, calibration=None):
     """ Procesa una hora de audio """
 
     station = item['station']
-    date = dateutil.parser.parse(item['date'])
 
-    log.info('processing segment: {}'.format(item))
+    if not calibration:
+        calibration = calibrations.get(station)
 
-    # 0. Obtener la información de calibración
-    calibration = calibrations_ref.child(station)
-    print(calibration)
+    tolerance = calibration['tolerance']
+    date = dateutil.parser.parse(item['date'])
+    segment_size = calibration['segmentSize']
+    audio_length = 0
+
+    log.info('[process_segment] (th: {}, tl: {}, ft: {}, ss: {}, ho: {}) {}'\
+        .format(
+            calibration['threshold'],
+            calibration['tolerance'],
+            calibration['fallTolerance'],
+            calibration['segmentSize'],
+            calibration['hourlyOffset'],
+            item,
+        )
+    )
 
     # 1. obtener el audio desde firebase
     #    y calcular su fingerprint.
-    filename, md5hash = cloud_download(ad_key=item['ad'])
-    if not filename:
-        log.info('ad file missing')
+    try:
+        filename, md5hash = cloud_download(ad_key=item['ad'])
+        if not filename:
+            log.info('[process_segment] ad file missing')
+            return
+    except Exception as err:
+        log.error('[process_segment] [{}] {}'.format(station, err))
         return
 
     # 1.1 Calcular el número de segmentos requeridos
     # de acuerdo a la duración total del audio.
     try:
         audio = mutagen.mp3.MP3(filename)
-        segments_needed = int(round(float(audio.info.length) / float(5)))
-        segments_needed = int(round(
-            segments_needed * SEGMENTS_TOLERANCE_RATE
-        ))
+        audio_length = audio.info.length
+
+        if segment_size == 'integer':
+            segment_size = int(audio_length)
+        elif segment_size == 'ceil':
+            segment_size = int(math.ceil(audio_length / 5)) * 5
+
+        segments_needed = int(round(float(audio_length) / float(segment_size)))
+        segments_needed = int(round(segments_needed * tolerance))
+
     except Exception as ex:
-        log.error('file {} is not an mp3'.format(filename))
+        log.error('[process_segment] file {} is not an mp3'.format(filename))
         log.error(str(ex))
         return
 
     try:
         dejavu.fingerprint_file(filename)
     except Exception as ex:
-        log.error('cannot fingerprint: {}'.format(ex))
+        log.error('[process_segment] cannot fingerprint: {}'.format(ex))
 
     """ Hay dos posibles escensarios al obtener los audios
       a. Los audios vienen por el parámetro "audios" de la
@@ -206,8 +245,12 @@ def process_segment(item, audios=None):
       b. Los audios se obtienen directamente de la base
          de datos en modo de cursor.
     """
-    audios_iterable = audios if audios \
-        else iterate_audios(date, station)
+    try:
+        audios_iterable = audios if audios \
+            else iterate_audios(date, station, calibration=calibration)
+    except sqlite3.OperationalError as err:
+        log.error('[process_segment] [{}] {}'.format(station, err))
+        return
 
     # 2. Read the list of files from local database
     audios_counter = 0
@@ -215,10 +258,16 @@ def process_segment(item, audios=None):
 
     try:
         for path, name, ts in audios_iterable:
-            log.info('file: {}'.format(path))
+            short_path = os.path.join(station, name)
             audios_counter += os.path.isfile(path)
             values = []
-            for match in dejavu.recognize(recognizer, path, 5,
+
+            if not os.path.isfile(path):
+                log.error('[process_segment] file not found: {}'\
+                    .format(short_path))
+                continue
+
+            for match in dejavu.recognize(recognizer, path, segment_size,
                                           ads_filter=[md5hash]):
                 results.append({
                     'confidence': match['confidence'],
@@ -229,7 +278,7 @@ def process_segment(item, audios=None):
 
                 ts += match['length'] / 1000
 
-            log.info('{2} ({3}/{0}) {1}'.format(
+            log.info('[process_segment] [{3}] {2} {0}) {1}'.format(
                 os.path.split(path)[-1],
                 ','.join(values),
                 item['ad'],
@@ -245,27 +294,29 @@ def process_segment(item, audios=None):
             item['id'],
             None, # TODO: send results again
             found=find_repetitions(results,
-                segments_needed=segments_needed
+                segments_needed=segments_needed,
+                calibration=calibration,
             ),
             missing_files=(12 - audios_counter) \
                           if audios_counter < 12 else 0
         )
-        log.info('API response: {}'.format(response))
+        log.info('[{}] API response: {}'.format(station, response))
     except ConnectionError as ex:
-        log.error(str(ex))
+        log.error('[process_segment] {}'.format(str(ex)))
     except UserWarning as warn:
         log.warning(str(warn))
 
-def find_repetitions(results, segments_needed=2):
+def find_repetitions(results, segments_needed=2, calibration=None):
     found_counter = 0
     found_index = None
     expect_space = False
     expect_recover = False
     last_value_in_threshold_index = -1
+    fall_tolerance = calibration['fallTolerance']
     found = []
 
     if threshold_mode == THRESHOLD_FIXED:
-        threshold = THRESHOLD
+        threshold = calibration['threshold']
     elif threshold_mode == THRESHOLD_AVERAGE:
         values = [x['confidence'] for x in results]
         threshold = math.ceil(float(sum(values)) / float(len(values)))
@@ -283,7 +334,7 @@ def find_repetitions(results, segments_needed=2):
                 if expect_recover:
                     expect_recover = False
 
-            elif FALL_TOLERANCE_SEGMENTS:
+            elif fall_tolerance:
                 if not expect_recover:
                     if last_value_in_threshold_index != -1:
                         """ Solo cuando ya haya entrado por lo menos
@@ -316,14 +367,23 @@ def find_repetitions(results, segments_needed=2):
 
     return found
 
-def iterate_audios(dt, station):
+def iterate_audios(dt, station, calibration=None):
     """ Given a datetime object and an station,
     iterate a list of files that are between
     the the date and itself plus 5 minutes;
     station must match too """
-    from_time = time.mktime(dt.timetuple()) \
-              - AHEAD_TIME_AUDIO_TOLERANCE
-    to_time = from_time + 3599 + AHEAD_TIME_AUDIO_TOLERANCE
+
+    tm = time.mktime(dt.timetuple())
+
+    if calibration and calibration['hourlyOffset']:
+        hoffset = calibration['hourlyOffset']
+        from_time = tm + hoffset
+        to_time = tm + 3599 + hoffset
+
+    elif AHEAD_TIME_AUDIO_TOLERANCE:
+        """ Conventional mode """
+        from_time = tm + AHEAD_TIME_AUDIO_TOLERANCE
+        to_time = from_time + 3599 + AHEAD_TIME_AUDIO_TOLERANCE
 
     log.info('from {} to {}'.format(int(from_time), int(to_time)))
 

+ 2 - 1
requirements.txt

@@ -1,4 +1,5 @@
 tornado
 firebase_admin
 python-firebase
-requests
+requests
+mutagen

+ 4 - 1
setup.py

@@ -13,7 +13,10 @@ setup(
   description = 'Fourier ondemand service',
   long_description = 'You can use it for do maintenance stuff',
   scripts = [join(binpath, 'fourier-ondemand')],
-  install_requires=['pyaudio', 'psutil', 'tornado', 'firebase_admin', 'mutagen'],
+  install_requires=['pyaudio', 'psutil', 'tornado',
+    'firebase_admin>=2.16', 'mutagen',
+    'requests>=2.18'
+  ],
   author = 'Gamaliel Espinoza M.',
   author_email = 'gamaliel.espinoza@gmail.com',
   url = 'https://git.miralo.xyz/AudioValid/fourier-ondemand',

+ 33 - 1
tests/test_general.py

@@ -6,7 +6,7 @@ import time
 
 
 class TestGeneral(unittest.TestCase):
-
+    """
     def test_find_repetitions(self):
         found = find_repetitions([
             {'timestamp': 1519769650, 'confidence': 5},
@@ -49,6 +49,38 @@ class TestGeneral(unittest.TestCase):
         filename, md5hash = cloud_download(ad_key='-L6EoklO90painUCd7df')
         print(filename, md5hash)
         pass
+    """
+
+    def test_repetitions_new_model(self):
+        found = find_repetitions([
+            {'timestamp': 1519769650, 'confidence': 5},
+            {'timestamp': 1519769655, 'confidence': 1},
+            {'timestamp': 1519769660, 'confidence': 1},
+            {'timestamp': 1519769665, 'confidence': 5},
+            {'timestamp': 1519769670, 'confidence': 3},
+            {'timestamp': 1519769675, 'confidence': 3},
+            {'timestamp': 1519769680, 'confidence': 2},
+            {'timestamp': 1519769685, 'confidence': 3},
+            {'timestamp': 1519769690, 'confidence': 9},
+            {'timestamp': 1519769695, 'confidence': 1},
+            {'timestamp': 1519769700, 'confidence': 2},
+            {'timestamp': 1519769705, 'confidence': 2},
+            {'timestamp': 1519769710, 'confidence': 2},
+            {'timestamp': 1519769720, 'confidence': 3},
+            {'timestamp': 1519769725, 'confidence': 3},
+            {'timestamp': 1519769730, 'confidence': 66},
+            {'timestamp': 1519769735, 'confidence': 12},
+            {'timestamp': 1519769735, 'confidence': 30},
+            {'timestamp': 1519769740, 'confidence': 8},
+            {'timestamp': 1519769745, 'confidence': 34},
+            {'timestamp': 1519769750, 'confidence': 3},
+            {'timestamp': 1519769755, 'confidence': 3},
+            {'timestamp': 1519769760, 'confidence': 3},
+            {'timestamp': 1519769765, 'confidence': 6},
+            {'timestamp': 1519769770, 'confidence': 4},
+        ])
+
+        print(found)
 
 
 if __name__ == '__main__':