123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- # -*- coding: utf8 -*-
- from __future__ import print_function, absolute_import
- from tornado.ioloop import IOLoop
- from tornado.web import Application
- from fourier.api.client import Client, ConnectionError
- from fourier.boxconfig import parse_config
- from fourier.dejavu.recognize import FilePerSecondRecognizer
- from datetime import datetime, timedelta
- from ondemand.endpoint import setup_endpoint
- from fourier.dejavu import Dejavu
- from Queue import Queue, Empty
- from firebase_admin import credentials
- from firebase_admin import db as fbdb
- from binascii import hexlify
- from base64 import b64decode
- import firebase_admin
- import mutagen.mp3
- import requests
- import dateutil
- import sqlite3
- import time
- import sys
- import os
- AUDIOS_PATH = '/tmp'
- AHEAD_TIME_AUDIO_TOLERANCE = 2 # second
- config = parse_config()
- queue = Queue()
- client = Client(config['device_id'],
- config['apiSecret'])
- cloud_base_url = 'https://storage.googleapis.com/{}'\
- .format(config['bucket'])
- base_path = config.get("basepath", "/var/fourier")
- fb_credentials = credentials.Certificate('/etc/Fourier-key.json')
- firebase_admin.initialize_app(fb_credentials, config['firebase'])
- dejavu = Dejavu({"database_type":"mem"})
- device_id = config['device_id']
- device_path = os.path.join(base_path, device_id)
- recognizer = FilePerSecondRecognizer
- db_path = os.path.join(device_path, 'files.db')
- db = sqlite3.connect(db_path)
- cloud_cache = {}
- def feed_queue():
- """ Search for pending scheduled work in
- server and add them to a memory queue. """
- try:
- response = client.get_schedule_pending()
- for item in response['items']:
- queue.put(item)
- if queue.qsize() > 0:
- loop.add_callback(process_queue)
- else:
- loop.add_timeout(time.time() + 30, feed_queue)
- except ConnectionError as ex:
- print('cannot feed: {}, retryig later'.format(ex),
- file=sys.stderr)
- loop.add_timeout(time.time() + 15, feed_queue)
- except Exception as ex:
- """ Errores desconocidos """
- print(ex, file=sys.stderr)
- loop.add_timeout(time.time() + 60, feed_queue)
- def process_queue():
- """ Try to the next item in a queue and start
- processing it accordingly. If success, repeat
- the function or go to feed if no more items. """
- try:
- item = queue.get(False)
- process_segment(item)
- loop.add_callback(process_queue)
- except Empty:
- loop.add_callback(feed_queue)
- except Exception as ex:
- print(str(ex), file=sys.stderr)
- loop.add_callback(process_queue)
- def process_segment(item):
- """ Procesa una hora de audio """
- station = item['station']
- date = dateutil.parser.parse(item['date'])
- # 1. obtener el audio desde firebase
- # y calcular su fingerprint.
- filename, md5hash = cloud_download(ad_key=item['ad'])
- if not filename:
- print('ad file missing', file=sys.stderr)
- return
- # 1.1 Calcular el número de segmentos requeridos
- # de acuerdo a la duración total del audio.
- try:
- audio = mutagen.mp3.MP3(filename)
- segments_needed = int(round(float(audio.info.length) / float(5)))
- except Exception as ex:
- print('file {} is not an mp3'.format(audio))
- print(ex)
- return
- try:
- dejavu.fingerprint_file(filename)
- except Exception as ex:
- print('cannot fingerprint: {}'.format(ex))
- # 2. Read the list of files from local database
- audios_counter = 0
- results = []
- for path, name, ts in iterate_audios(date, station):
- print(path)
- audios_counter += os.path.isfile(path)
- for match in dejavu.recognize(recognizer, path, 5,
- ads_filter=[md5hash]):
- try:
- results.append({
- 'confidence': match['confidence'],
- 'timestamp': ts,
- 'offset': match['offset']
- })
- print("{} {}".format(ts, match['confidence']))
- except KeyError as ex:
- print(ex, file=sys.stderr)
- ts += match['length'] / 1000
-
- try:
- response = client.put_schedule_results(
- item['schedule'],
- item['id'],
- None, # TODO: send results again
- found=find_repetitions(results,
- segments_needed=segments_needed
- ),
- missing_files=(12 - audios_counter) \
- if audios_counter < 12 else 0
- )
- print("api response: {}".format(response))
- except ConnectionError as ex:
- print(ex, file=sys.stderr)
- except UserWarning as warn:
- print(str(warn), file=sys.stderr)
- def find_repetitions(results, segments_needed=2):
- found_counter = 0
- found_index = None
- seconds_needed = 9
- threshold = 20
- expect_space = False
- found = []
- if segments_needed < 1:
- segments_needed = 1
- for index, result in enumerate(results):
- if not expect_space:
- if result['confidence'] > threshold:
- found_counter += 1
- if found_index is None:
- found_index = index
- else:
- found_counter = 0
- found_index = None
- else:
- if result['confidence'] <= threshold:
- expect_space = False
- if found_counter >= segments_needed:
- found.append(results[found_index]['timestamp'])
- found_counter = 0
- expect_space = True
- return found
- def iterate_audios(dt, station):
- """ Given a datetime object and an station,
- iterate a list of files that are between
- the the date and itself plus 5 minutes;
- station must match too """
- from_time = time.mktime(dt.timetuple()) \
- - AHEAD_TIME_AUDIO_TOLERANCE
- to_time = from_time + 3599 + AHEAD_TIME_AUDIO_TOLERANCE
- print('from {} to {}'.format(from_time, to_time))
- cursor = db.cursor()
- cursor.execute((
- 'select "filename", "timestamp" '
- 'from "file" '
- 'where "timestamp" between ? and ? '
- 'and "station" = ? '
- 'order by "timestamp" asc'
- ),
- (from_time, to_time, station, ),
- )
- files = [file for file in cursor]
- cursor.close()
- for mp3 in files:
- mp3path, ts = mp3
- mp3name = os.path.basename(mp3path)
- yield (mp3path, mp3name, ts)
- def cloud_download(ad_key=None):
- """ Given an ad key, the file is downloaded to
- the system temporal folder to be processed """
- if ad_key in cloud_cache:
- """ If this file has already been downloaded,
- will not be downloaded again, instead will
- be taken from cloud_cache dictionary """
- filename, md5hash = cloud_cache[ad_key]
- if os.path.isfile(filename):
- return filename, md5hash
- ad = fbdb.reference('ads/{}'.format(ad_key)).get()
- filename = os.path.basename(ad['path'])
- out_file = os.path.join(AUDIOS_PATH, filename)
- url = '{}/{}'.format(cloud_base_url, ad['path'])
- response = requests.get(url)
-
- if response.status_code == 200:
- hashes = response.headers['x-goog-hash']
- hashes = hashes.split(',')
- hashes = [h.split('=', 1) for h in hashes]
- hashes = {h[0].strip(): hexlify(b64decode(h[1])) for h in hashes}
- md5sum = hashes['md5']
- with open(out_file, "wb") as fp:
- fp.write(response.content)
- tp = (out_file, md5sum,)
- cloud_cache[ad_key] = tp
- return tp
- app = setup_endpoint(queue=queue)
- loop = IOLoop.current()
- loop.add_callback(feed_queue)
- if __name__ == '__main__':
- try:
- loop.start()
- except KeyboardInterrupt:
- print('process killed')
|