Hugo před 3 roky
rodič
revize
1c7b37a44f
5 změnil soubory, kde provedl 1037 přidání a 2 odebrání
  1. 1 1
      Dockerfile
  2. 773 0
      ondemand/app.ipynb
  3. 1 1
      ondemand/dejavu/__init__.py
  4. 255 0
      ondemand/ondemand.py
  5. 7 0
      requirements.txt

+ 1 - 1
Dockerfile

@@ -5,5 +5,5 @@ RUN python -m pip install --upgrade pip
 RUN pip install cryptography numpy pydub matplotlib scipy tornado requests pyaudio firebase_admin psutil mutagen
 ADD . /code/
 WORKDIR /code/ondemand/
-ENTRYPOINT [ "python", "service.py"]
+ENTRYPOINT [ "python", "ondemand.py"]
 #RUN python setup.py install

+ 773 - 0
ondemand/app.ipynb

@@ -0,0 +1,773 @@
+{
+ "cells": [
+  {
+   "cell_type": "code",
+   "execution_count": 1,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import warnings\n",
+    "warnings.simplefilter(\"ignore\", UserWarning)\n",
+    "\n",
+    "from __future__ import print_function, absolute_import\n",
+    "from tornado.ioloop import IOLoop\n",
+    "from client import Client, ConnectionError\n",
+    "from boxconfig import parse_config\n",
+    "from dejavu.recognize import FilePerSecondRecognizer\n",
+    "from dejavu import Dejavu, CouldntDecodeError\n",
+    "from endpoint import setup_endpoint\n",
+    "from multiprocessing import Process\n",
+    "import logging as log\n",
+    "import requests\n",
+    "import dateutil\n",
+    "import math\n",
+    "import time\n",
+    "import os\n",
+    "\n",
+    "from queue import Queue, Empty"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 2,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "PATH = '/tmp'\n",
+    "AHEAD_TIME_AUDIO_TOLERANCE = 2  # second\n",
+    "MAX_SEGMENT_THREADS = 4\n",
+    "THRESHOLD = 10\n",
+    "SEGMENTS_TOLERANCE_RATE = 0.6\n",
+    "FALL_TOLERANCE_SEGMENTS = 1"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 3,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "THRESHOLD_FIXED = 1\n",
+    "THRESHOLD_AVERAGE = 2"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 4,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "QUEUE_SINGLE = 1\n",
+    "QUEUE_THREAD = 2"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 5,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "MultiAPI = Process"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 6,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "config = parse_config()\n",
+    "queue = Queue()\n",
+    "\n",
+    "cloud_base_url = 'https://storage.googleapis.com/{}' \\\n",
+    "    .format(config['bucket'])\n",
+    "recognizer = FilePerSecondRecognizer"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 7,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "queue_mode = QUEUE_SINGLE\n",
+    "threshold_mode = THRESHOLD_FIXED"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 8,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "def obt_siguiente_trabajo():\n",
+    "    url = 'https://api.fourier.audio/na/calendario/pendiente?id=%s' % (config['device_id'],)\n",
+    "    response = requests.get(url)\n",
+    "    log.info(response.json())\n",
+    "    return response.json()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 9,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "def descargar_anuncio(ad_path):\n",
+    "    anuncio = os.path.basename(ad_path)\n",
+    "    path = os.path.join(PATH, 'ads')\n",
+    "    os.makedirs(path, exist_ok=True)\n",
+    "    ruta_anuncio = os.path.join(path, anuncio)\n",
+    "\n",
+    "    if os.path.isfile(ruta_anuncio):\n",
+    "        return ruta_anuncio\n",
+    "\n",
+    "    url = '{}/{}'.format(cloud_base_url, ad_path)\n",
+    "    response = requests.get(url)\n",
+    "\n",
+    "    # TODO: Agregar alerta cuando la respuesta no sea 200\n",
+    "    if response.status_code == 200:\n",
+    "        with open(ruta_anuncio, \"wb\") as fp:\n",
+    "            fp.write(response.content)\n",
+    "            return ruta_anuncio\n",
+    "\n",
+    "    else:\n",
+    "        print(\"Error al descargar\")\n",
+    "        print(response)\n",
+    "        return None\n"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 10,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "def descargar_media(box, station, media):\n",
+    "    ref = '{}/{}/{}'.format(box, station, media)\n",
+    "    file = os.path.basename(ref)\n",
+    "    path = os.path.join(PATH, 'fourier', box, station)\n",
+    "    os.makedirs(path, exist_ok=True)\n",
+    "    out_file = os.path.join(path, file)\n",
+    "\n",
+    "    if os.path.isfile(out_file):\n",
+    "        return out_file\n",
+    "\n",
+    "    filename = ref.replace(\"/\",\"%2F\") \\\n",
+    "        .replace(\"+\",\"%2B\")\n",
+    "    url = '{}/{}'.format(cloud_base_url, filename)\n",
+    "    response = requests.get(url)\n",
+    "\n",
+    "    if response.status_code == 200:\n",
+    "        with open(out_file, \"wb\") as fp:\n",
+    "            fp.write(response.content)\n",
+    "            return out_file\n",
+    "    else:\n",
+    "        print(\"Error al descargar\")\n",
+    "        print(response)\n",
+    "        return None\n"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 11,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "def obt_calibracion(calibracion):\n",
+    "    default = {\n",
+    "        'threshold': 12,\n",
+    "        'tolerance': 0.8,\n",
+    "        'fallTolerance': 1,\n",
+    "        'segmentSize': 5,\n",
+    "    }\n",
+    "\n",
+    "    if 'threshold' in calibracion:\n",
+    "        default['threshold'] = calibracion['threshold']\n",
+    "    if 'tolerance' in calibracion:\n",
+    "        default['tolerance'] = calibracion['tolerance']\n",
+    "    if 'segmentSize' in calibracion:\n",
+    "        default['segmentSize'] = calibracion['segmentSize']\n",
+    "    if 'fallTolerance' in calibracion:\n",
+    "        default['fallTolerance'] = calibracion['fallTolerance']\n",
+    "\n",
+    "    return default"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 12,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "def encontrar_resultados(resultados, segments_needed=4, calibration=None):\n",
+    "    found_counter = 0\n",
+    "    found_down_counter = 0\n",
+    "    found_index = None\n",
+    "    expect_space = False\n",
+    "    expect_recover = False\n",
+    "    last_value_in_threshold_index = -1\n",
+    "    fall_tolerance = calibration['fallTolerance']\n",
+    "    found = []\n",
+    "\n",
+    "    if threshold_mode == THRESHOLD_FIXED:\n",
+    "        threshold = calibration['threshold']\n",
+    "    elif threshold_mode == THRESHOLD_AVERAGE:\n",
+    "        values = [x['confidence'] for x in resultados]\n",
+    "        threshold = math.ceil(float(sum(values)) / float(len(values)))\n",
+    "\n",
+    "    if segments_needed < 1:\n",
+    "        segments_needed = 1\n",
+    "\n",
+    "    for index, result in enumerate(resultados):\n",
+    "        if not expect_space:\n",
+    "            if result['confidence'] >= threshold:\n",
+    "                found_counter += 1\n",
+    "                last_value_in_threshold_index = index\n",
+    "                if found_index is None:\n",
+    "                    found_index = index\n",
+    "                if expect_recover:\n",
+    "                    found_counter += found_down_counter\n",
+    "                    expect_recover = False\n",
+    "\n",
+    "            elif fall_tolerance:\n",
+    "                if not expect_recover:\n",
+    "                    if last_value_in_threshold_index != -1:\n",
+    "                        expect_recover = True\n",
+    "                        found_down_counter += 1\n",
+    "                    else:\n",
+    "                        pass\n",
+    "                else:\n",
+    "                    found_counter = 0\n",
+    "                    found_down_counter = 0\n",
+    "                    found_index = None\n",
+    "                    expect_recover = False\n",
+    "\n",
+    "            else:\n",
+    "                found_counter = 0\n",
+    "                found_down_counter = 0\n",
+    "                found_index = None\n",
+    "                expect_recover = False\n",
+    "\n",
+    "        else:\n",
+    "            if result['confidence'] <= threshold:\n",
+    "                expect_space = False\n",
+    "\n",
+    "        if found_counter >= segments_needed:\n",
+    "            found_row = resultados[found_index]\n",
+    "            found.append(found_row)\n",
+    "            found_counter = 0\n",
+    "            expect_space = True\n",
+    "\n",
+    "    return found"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 13,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "def enviar_resultados(item):\n",
+    "    url = 'https://api.fourier.audio/v1/calendario/resultado'\n",
+    "    response = requests.post(url, json=item)\n",
+    "    return response"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 27,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "pendiente = obt_siguiente_trabajo()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 28,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# Procesar elemento pendiente\n",
+    "\n",
+    "ciudad = pendiente['origen']\n",
+    "estacion = pendiente['estacion']\n",
+    "calibracion = obt_calibracion(pendiente['calibracion'])\n",
+    "tolerance = calibracion['tolerance']\n",
+    "tamano_segmento = calibracion['segmentSize']\n",
+    "longitud_audio = 30"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 29,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "try:\n",
+    "    anuncios = []\n",
+    "    id_by_ad = {}\n",
+    "    item_ids = []\n",
+    "    x = 0\n",
+    "    for i in pendiente[\"elementos\"]:\n",
+    "        x = x + 1\n",
+    "        id_by_ad[i['anuncio']] = i['id']\n",
+    "        if i['id'] not in item_ids:\n",
+    "            item_ids.append(i['id'])\n",
+    "\n",
+    "        anuncio = descargar_anuncio(i[\"ruta\"])\n",
+    "        if anuncio is not None:\n",
+    "            anuncios.append(anuncio)\n",
+    "        else:\n",
+    "            print('[process_segment] ad file missing')\n",
+    "\n",
+    "except Exception as err:\n",
+    "    print('[process_segment] [{}] {}'.format(estacion, err))"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 30,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "try:\n",
+    "    media = []\n",
+    "    for i in pendiente[\"media\"]:\n",
+    "        archivo = descargar_media(ciudad, estacion, i[\"ruta\"])\n",
+    "        if archivo is not None:\n",
+    "            media.append((archivo, i[\"fecha\"], i[\"timestamp\"]))\n",
+    "\n",
+    "except Exception as err:\n",
+    "    print(err)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "dejavu = Dejavu({\"database_type\": \"mem\"})\n",
+    "try:\n",
+    "    x = 0\n",
+    "    for anuncio in anuncios:\n",
+    "        dejavu.fingerprint_file(anuncio)\n",
+    "except Exception as ex:\n",
+    "    print(err)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# Prueba, huellas las grabaciones en lugar de los anuncios\n",
+    "\"\"\"\n",
+    "dejavu = Dejavu({\"database_type\": \"mem\"})\n",
+    "try:\n",
+    "    x = 0\n",
+    "    for ruta, fecha, ts in media:\n",
+    "        print(\"Huellando %s\" % (ruta,))\n",
+    "        dejavu.fingerprint_file(ruta)\n",
+    "except Exception as ex:\n",
+    "    print(err)\n",
+    "\"\"\""
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "if tamano_segmento == 'integer':\n",
+    "    tamano_segmento = int(longitud_audio)\n",
+    "elif tamano_segmento == 'ceil':\n",
+    "    tamano_segmento = int(math.ceil(longitud_audio / 5)) * 5\n",
+    "\n",
+    "segmentos_necesarios = int(round(float(longitud_audio) / float(tamano_segmento)))\n",
+    "segmentos_necesarios = int(round(segmentos_necesarios * tolerance))"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "anuncios_en_paralelo = 5 # Este valor debe venir desde el php\n",
+    "\n",
+    "dejavu = None\n",
+    "cont_media = len(media)\n",
+    "cont_anuncios = len(anuncios)\n",
+    "resultados = []\n",
+    "v = []\n",
+    "try:\n",
+    "    x = 0\n",
+    "    while x < cont_anuncios:\n",
+    "        y = 0\n",
+    "        dejavu = Dejavu({\"database_type\": \"mem\"})\n",
+    "        print(\"Nueva comparación\")\n",
+    "        while y < anuncios_en_paralelo and x < cont_anuncios:\n",
+    "            anuncio = anuncios[x]\n",
+    "            print(\"Agregando anuncio %s %s\" % (x, os.path.split(anuncio)[-1],))\n",
+    "            dejavu.fingerprint_file(anuncio)\n",
+    "            y += 1\n",
+    "            x += 1\n",
+    "\n",
+    "        for ruta, fecha, ts in media:\n",
+    "            values = []\n",
+    "            try:\n",
+    "                for match in dejavu.recognize(recognizer, ruta, tamano_segmento):\n",
+    "                    name = None\n",
+    "                    ad = None\n",
+    "                    try:\n",
+    "                        ad = match['name']\n",
+    "                        if match['name'] in id_by_ad.keys():\n",
+    "                            name = id_by_ad[match['name']]\n",
+    "                        else:\n",
+    "                            name = match['name']\n",
+    "\n",
+    "                    except KeyError:\n",
+    "                        pass\n",
+    "\n",
+    "                    resultados.append({\n",
+    "                        'ad': ad,\n",
+    "                        'confidence': match['confidence'],\n",
+    "                        'timestamp': ts,\n",
+    "                        'offset': match['offset'],\n",
+    "                        'name': name\n",
+    "                    })\n",
+    "                    values.append(str(match['confidence']))\n",
+    "                    ts = ts + 5\n",
+    "\n",
+    "                v.append(','.join(values))\n",
+    "                print('[process_segment] [{2}] {0} {1}'.format(\n",
+    "                    os.path.split(ruta)[-1],\n",
+    "                    ','.join(values),\n",
+    "                    estacion,\n",
+    "                ))\n",
+    "\n",
+    "            except CouldntDecodeError as ex:\n",
+    "                log.error('[process_segment] {}'.format(ex))\n",
+    "\n",
+    "    try:\n",
+    "        encontrados = {}\n",
+    "        for i in item_ids:\n",
+    "            resultado = [r for r in resultados if r[\"name\"] == i]\n",
+    "            encontrados[i] = encontrar_resultados(resultado, \n",
+    "                segments_needed=tamano_segmento,\n",
+    "                calibration=calibracion,)\n",
+    "\n",
+    "        for id in encontrados:\n",
+    "            for e in encontrados[id]:\n",
+    "                for i in pendiente['elementos']:\n",
+    "                    if i['id'] == id and i['anuncio'] == e['ad']:\n",
+    "                        if 'encontrados' not in i:\n",
+    "                            i['encontrados'] = []\n",
+    "                        i['encontrados'].append(e)\n",
+    "                        break\n",
+    "\n",
+    "        pendiente[\"archivos_perdidos\"] = 0\n",
+    "        pendiente[\"total_archivos\"] = cont_media\n",
+    "        # response = enviar_resultados(pendiente)\n",
+    "    except ConnectionError as ex:\n",
+    "        pass\n",
+    "    except UserWarning as warn:\n",
+    "        pass\n",
+    "\n",
+    "except Exception as ex:\n",
+    "    print(err)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "try:\n",
+    "    encontrados = {}\n",
+    "    for i in item_ids:\n",
+    "        r = [result for result in resultados if result[\"name\"] == i]\n",
+    "        encontrados[i] = encontrar_resultados(r, segments_needed=segmentos_necesarios, calibration=calibracion,)\n",
+    "\n",
+    "    for id in encontrados:\n",
+    "        for e in encontrados[id]:\n",
+    "            for i in pendiente['elementos']:\n",
+    "                if i['id'] == id and i['anuncio'] == e['ad']:\n",
+    "                    if 'encontrados' not in i:\n",
+    "                        i['encontrados'] = []\n",
+    "                    i['encontrados'].append(e)\n",
+    "                    break\n",
+    "\n",
+    "    pendiente[\"archivos_perdidos\"] = 0\n",
+    "    pendiente[\"total_archivos\"] = cont_media\n",
+    "    response = enviar_resultados(pendiente)\n",
+    "except ConnectionError as ex:\n",
+    "    pass\n",
+    "except UserWarning as warn:\n",
+    "    pass\n"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": []
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": []
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": []
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 31,
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/plain": [
+       "{'-MSvVbBq7qIwUi4L2a8J': 46057046,\n",
+       " '-MlRe9DwDJVtYlLS-6PS': 46057064,\n",
+       " '-MlReJO2cHvAi7XG40mw': 46057082,\n",
+       " '-MlReBIHRuk0dC4WKiZR': 46057100,\n",
+       " '-MlReGOZY5fpsnnWNY6p': 46057118,\n",
+       " '-MlRd2rEdpoyR1LhE0nJ': 46057136,\n",
+       " '-MlRce4eqVUyTkDbIcAN': 46057154,\n",
+       " '-MlRelzkKzHdUpGZlkmD': 46057172,\n",
+       " '-MlRcIWglVS9eKoJZLlv': 46057190,\n",
+       " '-MlWhq2bbe7lED-ApFgp': 46057208,\n",
+       " '-MlRdwzYUS3k5valF9Ri': 46057226,\n",
+       " '-MlRcpOv5kyMAJ52pswd': 46057244,\n",
+       " '-MlReDcD94LuAkmbhq-n': 46057262,\n",
+       " '-MlRe-XN_j1Ns6CDUsP-': 46057280,\n",
+       " '-MlRc_XihusSzapzm6Qa': 46057298,\n",
+       " '-MlRe2hx_xDRiqMsE-pk': 46057316,\n",
+       " '-MlRe6KKd5gg8u5lbekH': 46057334}"
+      ]
+     },
+     "execution_count": 31,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "id_by_ad"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 25,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "Huellando /tmp/fourier/g2yJhO8z/101_7_SON_OBR/2021-09-01T06-00-00-0700.mp3\n",
+      "Huellando /tmp/fourier/g2yJhO8z/101_7_SON_OBR/2021-09-01T06-05-00-0700.mp3\n",
+      "Huellando /tmp/fourier/g2yJhO8z/101_7_SON_OBR/2021-09-01T06-10-00-0700.mp3\n",
+      "Huellando /tmp/fourier/g2yJhO8z/101_7_SON_OBR/2021-09-01T06-15-00-0700.mp3\n",
+      "Huellando /tmp/fourier/g2yJhO8z/101_7_SON_OBR/2021-09-01T06-20-00-0700.mp3\n",
+      "Huellando /tmp/fourier/g2yJhO8z/101_7_SON_OBR/2021-09-01T06-25-00-0700.mp3\n",
+      "Huellando /tmp/fourier/g2yJhO8z/101_7_SON_OBR/2021-09-01T06-30-00-0700.mp3\n",
+      "Huellando /tmp/fourier/g2yJhO8z/101_7_SON_OBR/2021-09-01T06-35-00-0700.mp3\n",
+      "Huellando /tmp/fourier/g2yJhO8z/101_7_SON_OBR/2021-09-01T06-40-00-0700.mp3\n",
+      "Huellando /tmp/fourier/g2yJhO8z/101_7_SON_OBR/2021-09-01T06-45-00-0700.mp3\n",
+      "Huellando /tmp/fourier/g2yJhO8z/101_7_SON_OBR/2021-09-01T06-50-00-0700.mp3\n",
+      "Huellando /tmp/fourier/g2yJhO8z/101_7_SON_OBR/2021-09-01T06-54-59-0700.mp3\n",
+      "{'id': '7798E7A7396865717C5FF58D40533B17D05D90DC', 'name': '2021-09-01T06-30-00-0700', 'confidence': 78, 'offset': 2559, 'offset_seconds': 285.21535, 'position': 2593, 'file_sha1': '7798E7A7396865717C5FF58D40533B17D05D90DC', 'match_time': {...}, 'length': 5000}\n",
+      "{'id': 'F4E326C6E5498481EBF40FEDB64D2D88F4DC4C0E', 'name': '2021-09-01T06-35-00-0700', 'confidence': 46, 'offset': 46, 'offset_seconds': 5.12697, 'position': 85, 'file_sha1': 'F4E326C6E5498481EBF40FEDB64D2D88F4DC4C0E', 'match_time': {...}, 'length': 5000}\n",
+      "{'id': 'F4E326C6E5498481EBF40FEDB64D2D88F4DC4C0E', 'name': '2021-09-01T06-35-00-0700', 'confidence': 45, 'offset': 91, 'offset_seconds': 10.14248, 'position': 130, 'file_sha1': 'F4E326C6E5498481EBF40FEDB64D2D88F4DC4C0E', 'match_time': {...}, 'length': 5000}\n",
+      "{'id': '373A10DF13ADBE563AB4A6F39D9DF0C73AA537FB', 'name': '2021-09-01T06-50-00-0700', 'confidence': 73, 'offset': 1789, 'offset_seconds': 199.39439, 'position': 1831, 'file_sha1': '373A10DF13ADBE563AB4A6F39D9DF0C73AA537FB', 'match_time': {...}, 'length': 5000}\n",
+      "{'id': '373A10DF13ADBE563AB4A6F39D9DF0C73AA537FB', 'name': '2021-09-01T06-50-00-0700', 'confidence': 37, 'offset': 1878, 'offset_seconds': 209.31396, 'position': 1918, 'file_sha1': '373A10DF13ADBE563AB4A6F39D9DF0C73AA537FB', 'match_time': {...}, 'length': 5000}\n",
+      "{'id': '373A10DF13ADBE563AB4A6F39D9DF0C73AA537FB', 'name': '2021-09-01T06-50-00-0700', 'confidence': 45, 'offset': 1923, 'offset_seconds': 214.32947, 'position': 1941, 'file_sha1': '373A10DF13ADBE563AB4A6F39D9DF0C73AA537FB', 'match_time': {...}, 'length': 5000}\n",
+      "{'id': '373A10DF13ADBE563AB4A6F39D9DF0C73AA537FB', 'name': '2021-09-01T06-50-00-0700', 'confidence': 166, 'offset': 1970, 'offset_seconds': 219.56789, 'position': 2013, 'file_sha1': '373A10DF13ADBE563AB4A6F39D9DF0C73AA537FB', 'match_time': {...}, 'length': 5000}\n",
+      "{'id': '373A10DF13ADBE563AB4A6F39D9DF0C73AA537FB', 'name': '2021-09-01T06-50-00-0700', 'confidence': 290, 'offset': 2015, 'offset_seconds': 224.5834, 'position': 2055, 'file_sha1': '373A10DF13ADBE563AB4A6F39D9DF0C73AA537FB', 'match_time': {...}, 'length': 5000}\n",
+      "{'id': '373A10DF13ADBE563AB4A6F39D9DF0C73AA537FB', 'name': '2021-09-01T06-50-00-0700', 'confidence': 95, 'offset': 2015, 'offset_seconds': 224.5834, 'position': 2047, 'file_sha1': '373A10DF13ADBE563AB4A6F39D9DF0C73AA537FB', 'match_time': {...}, 'length': 5000}\n",
+      "{'id': '373A10DF13ADBE563AB4A6F39D9DF0C73AA537FB', 'name': '2021-09-01T06-50-00-0700', 'confidence': 259, 'offset': 2015, 'offset_seconds': 224.5834, 'position': 2055, 'file_sha1': '373A10DF13ADBE563AB4A6F39D9DF0C73AA537FB', 'match_time': {...}, 'length': 5000}\n",
+      "{'id': '373A10DF13ADBE563AB4A6F39D9DF0C73AA537FB', 'name': '2021-09-01T06-50-00-0700', 'confidence': 97, 'offset': 2015, 'offset_seconds': 224.5834, 'position': 2047, 'file_sha1': '373A10DF13ADBE563AB4A6F39D9DF0C73AA537FB', 'match_time': {...}, 'length': 5000}\n"
+     ]
+    }
+   ],
+   "source": [
+    "anuncios_en_paralelo = 1 # Este valor debe venir desde el php\n",
+    "\n",
+    "dejavu = None\n",
+    "cont_media = len(media)\n",
+    "cont_anuncios = len(anuncios)\n",
+    "resultados = []\n",
+    "v = []\n",
+    "founds = {}\n",
+    "try:\n",
+    "    dejavu = Dejavu({\"database_type\": \"mem\"})\n",
+    "    try:\n",
+    "        x = 0\n",
+    "        for ruta, fecha, ts in media:\n",
+    "            print(\"Huellando %s\" % (ruta,))\n",
+    "            dejavu.fingerprint_file(ruta)\n",
+    "    except Exception as ex:\n",
+    "        print(err)\n",
+    "\n",
+    "    for anuncio in anuncios:\n",
+    "        for i in dejavu.recognize(recognizer, anuncio, 5):\n",
+    "            if not \"id\" in i:\n",
+    "                continue\n",
+    "\n",
+    "            if i[\"confidence\"] < 35:\n",
+    "                continue\n",
+    "\n",
+    "            print(i)\n",
+    "            obj = i\n",
+    "            obj[\"match_time\"] = None\n",
+    "            dict = {\n",
+    "                \"id\": obj[\"id\"],\n",
+    "                \"anuncio\": anuncio,\n",
+    "                \"fecha\": obj[\"name\"],\n",
+    "                \"confianza\": obj[\"confidence\"],\n",
+    "                \"longitud\": obj[\"length\"],\n",
+    "                \"desfase_segundos\": obj[\"offset_seconds\"]\n",
+    "            }\n",
+    "\n",
+    "            if i[\"id\"] in founds.keys():\n",
+    "                founds[i[\"id\"]][\"longitud\"] = founds[i[\"id\"]][\"longitud\"] + dict[\"longitud\"]\n",
+    "                founds[i[\"id\"]][\"confianza\"] = founds[i[\"id\"]][\"confianza\"] + dict[\"confianza\"]\n",
+    "                continue\n",
+    "\n",
+    "            founds[i[\"id\"]] = dict\n",
+    "\n",
+    "except Exception as ex:\n",
+    "    print(err)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 26,
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/plain": [
+       "{'7798E7A7396865717C5FF58D40533B17D05D90DC': {'id': '7798E7A7396865717C5FF58D40533B17D05D90DC',\n",
+       "  'anuncio': '/tmp/ads/-MlRe9DwDJVtYlLS-6PS',\n",
+       "  'fecha': '2021-09-01T06-30-00-0700',\n",
+       "  'confianza': 78,\n",
+       "  'longitud': 5000,\n",
+       "  'desfase_segundos': 285.21535},\n",
+       " 'F4E326C6E5498481EBF40FEDB64D2D88F4DC4C0E': {'id': 'F4E326C6E5498481EBF40FEDB64D2D88F4DC4C0E',\n",
+       "  'anuncio': '/tmp/ads/-MlRe9DwDJVtYlLS-6PS',\n",
+       "  'fecha': '2021-09-01T06-35-00-0700',\n",
+       "  'confianza': 91,\n",
+       "  'longitud': 10000,\n",
+       "  'desfase_segundos': 5.12697},\n",
+       " '373A10DF13ADBE563AB4A6F39D9DF0C73AA537FB': {'id': '373A10DF13ADBE563AB4A6F39D9DF0C73AA537FB',\n",
+       "  'anuncio': '/tmp/ads/-MlReJO2cHvAi7XG40mw',\n",
+       "  'fecha': '2021-09-01T06-50-00-0700',\n",
+       "  'confianza': 1062,\n",
+       "  'longitud': 40000,\n",
+       "  'desfase_segundos': 199.39439}}"
+      ]
+     },
+     "execution_count": 26,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "founds"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "item = get_pendings()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "item[\"elementos\"]"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "cloud_download_file(item[\"origen\"], item[\"estacion\"], item['archivos'][0]['filename'])"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "path = os.path.join(AUDIOS_PATH, 'fourier', 'ciudad', 'estacion')"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "path"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "os.makedirs(path, exist_ok=True)"
+   ]
+  }
+ ],
+ "metadata": {
+  "interpreter": {
+   "hash": "631ec0267e76ead327ae18a3cdf21f6916cbb309615a11f42bd594f9973a79cd"
+  },
+  "kernelspec": {
+   "display_name": "Python 3.9.7 64-bit ('venv': venv)",
+   "name": "python3"
+  },
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 3
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython3",
+   "version": "3.9.7"
+  }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}

+ 1 - 1
ondemand/dejavu/__init__.py

@@ -160,7 +160,7 @@ class Dejavu(object):
         # return match info
         nseconds = round(float(largest) / fingerprint.DEFAULT_FS *
                          fingerprint.DEFAULT_WINDOW_SIZE *
-                         fingerprint.DEFAULT_OVERLAP_RATIO, 5)
+                         (1 - fingerprint.DEFAULT_OVERLAP_RATIO), 5)
         song = {
             Dejavu.SONG_ID : song_id,
             Dejavu.SONG_NAME : songname,

+ 255 - 0
ondemand/ondemand.py

@@ -0,0 +1,255 @@
+from __future__ import print_function, absolute_import
+from tornado.ioloop import IOLoop
+from client import Client, ConnectionError
+from boxconfig import parse_config
+from dejavu.recognize import FilePerSecondRecognizer
+from dejavu import Dejavu, CouldntDecodeError
+from endpoint import setup_endpoint
+
+from multiprocessing import Process
+import logging as log
+import requests
+import dateutil
+import math
+import time
+import os
+
+from queue import Queue, Empty
+
+log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
+
+PATH = '/tmp'
+AHEAD_TIME_AUDIO_TOLERANCE = 2  # second
+MAX_SEGMENT_THREADS = 4
+THRESHOLD = 10
+SEGMENTS_TOLERANCE_RATE = 0.6
+FALL_TOLERANCE_SEGMENTS = 1
+
+THRESHOLD_FIXED = 1
+THRESHOLD_AVERAGE = 2
+
+QUEUE_SINGLE = 1
+
+config = parse_config()
+queue = Queue()
+cloud_base_url = 'https://storage.googleapis.com/{}' \
+    .format(config['bucket'])
+recognizer = FilePerSecondRecognizer
+
+
+threshold_mode = THRESHOLD_FIXED
+
+
+def obt_siguiente_trabajo():
+    url = 'https://api.fourier.audio/na/calendario/pendiente?id=%s' % (config['device_id'],)
+    response = requests.get(url)
+    log.info(response.json())
+    return response.json()
+
+
+def descargar_anuncio(ad_path):
+    anuncio = os.path.basename(ad_path)
+    path = os.path.join(PATH, 'ads')
+    os.makedirs(path, exist_ok=True)
+    ruta_anuncio = os.path.join(path, anuncio)
+
+    if os.path.isfile(ruta_anuncio):
+        return ruta_anuncio
+
+    url = '{}/{}'.format(cloud_base_url, ad_path)
+    response = requests.get(url)
+
+    # TODO: Agregar alerta cuando la respuesta no sea 200
+    if response.status_code == 200:
+        with open(ruta_anuncio, "wb") as fp:
+            fp.write(response.content)
+            return ruta_anuncio
+
+    else:
+        print("Error al descargar")
+        print(response)
+        return None
+
+
+def descargar_media(box, station, media):
+    ref = '{}/{}/{}'.format(box, station, media)
+    file = os.path.basename(ref)
+    path = os.path.join(PATH, 'fourier', box, station)
+    os.makedirs(path, exist_ok=True)
+    out_file = os.path.join(path, file)
+
+    if os.path.isfile(out_file):
+        return out_file
+
+    filename = ref.replace("/","%2F") \
+        .replace("+","%2B")
+    url = '{}/{}'.format(cloud_base_url, filename)
+    response = requests.get(url)
+
+    if response.status_code == 200:
+        with open(out_file, "wb") as fp:
+            fp.write(response.content)
+            return out_file
+    else:
+        print("Error al descargar")
+        print(response)
+        return None
+
+
+def obt_calibracion(calibracion):
+    default = {
+        'threshold': 12,
+        'tolerance': 0.8,
+        'fallTolerance': 1,
+        'segmentSize': 5,
+    }
+
+    if 'threshold' in calibracion:
+        default['threshold'] = calibracion['threshold']
+    if 'tolerance' in calibracion:
+        default['tolerance'] = calibracion['tolerance']
+    if 'segmentSize' in calibracion:
+        default['segmentSize'] = calibracion['segmentSize']
+    if 'fallTolerance' in calibracion:
+        default['fallTolerance'] = calibracion['fallTolerance']
+
+    return default
+
+
+def enviar_resultados(item):
+    url = 'https://api.fourier.audio/v1/calendario/resultado'
+    response = requests.post(url, json=item)
+    return response
+
+
+def llenar_pila():
+    """ Search for pending scheduled work in
+    server and add them to a memory queue. """
+    try:
+        response = obt_siguiente_trabajo()
+        # response = client.get_schedule_pending()
+        # downloaded_counter = len(response['items'])
+        # for item in response['items']:
+        if len(response["elementos"]) > 0:
+            queue.put(response)
+
+        if queue.qsize() > 0:
+            loop.add_callback(procesar_siguiente_pila)
+        else:
+            loop.add_timeout(time.time() + 30, llenar_pila)
+
+    except ConnectionError as ex:
+        log.error('[feed_queue] cannot feed: {}, retryig later'.format(ex))
+        loop.add_timeout(time.time() + 15, llenar_pila)
+
+    except Exception as ex:
+        """ Errores desconocidos """
+        log.error('[feed_queue] {}'.format(ex))
+        loop.add_timeout(time.time() + 60, llenar_pila)
+        raise ex
+
+
+def procesar_siguiente_pila():
+    """ Try to the next item in a queue and start
+    processing it accordingly. If success, repeat
+    the function or go to feed if no more items. """
+    try:
+        item = queue.get(False)
+        procesar_trabajo(item)
+        loop.add_callback(procesar_siguiente_pila)
+    except Empty:
+        loop.add_callback(llenar_pila)
+    except Exception as ex:
+        log.error(ex)
+        loop.add_callback(procesar_siguiente_pila)
+
+
+def procesar_trabajo(pendiente):
+    ciudad = pendiente['origen']
+    estacion = pendiente['estacion']
+
+    # Descarga de anuncios
+    try:
+        anuncios = []
+        id_by_ad = {}
+        item_ids = []
+        x = 0
+        for i in pendiente["elementos"]:
+            x = x + 1
+            id_by_ad[i['anuncio']] = i['id']
+            if i['id'] not in item_ids:
+                item_ids.append(i['id'])
+
+            anuncio = descargar_anuncio(i["ruta"])
+            if anuncio is not None:
+                anuncios.append(anuncio)
+
+    except Exception as err:
+        print('[process_segment] [{}] {}'.format(estacion, err))
+
+    # Descarga de media
+    try:
+        media = []
+        for i in pendiente["media"]:
+            archivo = descargar_media(ciudad, estacion, i["ruta"])
+            if archivo is not None:
+                media.append((archivo, i["fecha"], i["timestamp"]))
+
+    except Exception as err:
+        print(err)
+
+    dejavu = None
+    resultados = []
+    v = []
+    resultados = {}
+    try:
+        dejavu = Dejavu({"database_type": "mem"})
+        try:
+            x = 0
+            for ruta, fecha, ts in media:
+                print("Huellando %s" % (ruta,))
+                dejavu.fingerprint_file(ruta)
+        except Exception as ex:
+            print(err)
+
+        for anuncio in anuncios:
+            for i in dejavu.recognize(recognizer, anuncio, 5):
+                if not "id" in i:
+                    continue
+
+                if i["confidence"] < 35:
+                    continue
+
+                print(i)
+                obj = i
+                obj["match_time"] = None
+                dict = {
+                    "id": obj["id"],
+                    "anuncio": anuncio,
+                    "fecha": obj["name"],
+                    "confianza": obj["confidence"],
+                    "longitud": obj["length"],
+                    "desfase_segundos": obj["offset_seconds"]
+                }
+
+                if i["id"] in resultados.keys():
+                    resultados[i["id"]]["longitud"] = resultados[i["id"]]["longitud"] + dict["longitud"]
+                    resultados[i["id"]]["confianza"] = resultados[i["id"]]["confianza"] + dict["confianza"]
+                    continue
+
+                resultados[i["id"]] = dict
+
+    except Exception as ex:
+        print(err)
+
+
+app = setup_endpoint(queue=queue)
+loop = IOLoop.current()
+loop.add_callback(llenar_pila)
+
+if __name__ == '__main__':
+    try:
+        log.info('Starting ondemand service')
+        loop.start()
+    except KeyboardInterrupt:
+        log.error('Process killed')

+ 7 - 0
requirements.txt

@@ -3,3 +3,10 @@ firebase_admin
 python-firebase
 requests
 mutagen
+python-dateutil
+numpy
+pyaudio
+psutil
+pydub
+matplotlib
+scipy