Selaa lähdekoodia

More stuff about threaded segment processing

Gamaliel Espinoza Macedo 6 vuotta sitten
vanhempi
commit
7d509a4472
1 muutettua tiedostoa jossa 43 lisäystä ja 1 poistoa
  1. 43 1
      ondemand/service.py

+ 43 - 1
ondemand/service.py

@@ -13,6 +13,8 @@ from firebase_admin import credentials
 from firebase_admin import db as fbdb
 from binascii import hexlify
 from base64 import b64decode
+from threading import Thread
+from argparse import ArgumentParser
 import logging as log
 import firebase_admin
 import mutagen.mp3
@@ -27,6 +29,16 @@ log.basicConfig(format='[%(asctime)s] %(message)s', level=log.INFO)
 
 AUDIOS_PATH = '/tmp'
 AHEAD_TIME_AUDIO_TOLERANCE = 2 # second
+MAX_SEGMENT_THREADS = 2
+
+# Modos de procesamiento de queue
+#  - QUEQUE_SINGLE: procesa solo un segmento a la vez
+#  - QUEUE_THREAD:  inicia un hilo para cada segmento
+# Por default se usará el threaded.
+# TOOD: hacerlo configurable por medio de argumentos
+#       de ejecución.
+QUEUE_SINGLE = 1
+QUEUE_THREAD = 2
 
 config = parse_config()
 queue = Queue()
@@ -41,6 +53,7 @@ dejavu = Dejavu({"database_type":"mem"})
 device_id = config['device_id']
 device_path = os.path.join(base_path, device_id)
 recognizer = FilePerSecondRecognizer
+queue_mode = QUEUE_THREAD
 
 db_path = config.get('localDatabase', os.path.join(device_path, 'files.db'))
 db = sqlite3.connect(db_path)
@@ -62,7 +75,10 @@ def feed_queue():
                     )
 
         if queue.qsize() > 0:
-            loop.add_callback(process_queue)
+            if queue_mode == QUEUE_THREAD:
+                loop.add_callback(process_queue_with_threads)
+            else:
+                loop.add_callback(process_queue)
         else:
             loop.add_timeout(time.time() + 30, feed_queue)
 
@@ -89,6 +105,32 @@ def process_queue():
         log.error(ex)
         loop.add_callback(process_queue)
 
+def process_queue_with_threads():
+    threads = [None] * MAX_SEGMENT_THREADS
+    is_drained = False
+
+    log.info('Starting thread processing')
+
+    while True:
+        for index, t in enumerate(threads):
+            if not t:
+                try:
+                    item = queue.get(False)
+                    thread = Thread(process_segment, args=(item,))
+                    thread.start()
+                except Empty:
+                    is_drained = True
+
+            elif not t.is_alive():
+                threads[index] = None
+
+        if is_drained:
+            if threads.count(None) == MAX_SEGMENT_THREADS:
+                break
+
+    log.info('Finished thread processing')
+    loop.add_callback(feed_queue)
+
 def process_segment(item):
     """ Procesa una hora de audio """