123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- const config = require("fourier-common").config;
- const db = require("fourier-common").fbconn.database();
- const spawn = require("child_process").spawn;
- const path = require("path");
- const express = require('express');
- const version = require('./package.json').version;
- const moment = require('moment');
- require('console-stamp')(console, 'HH:MM:ss.l');
- var working = false;
- var queue = {};
- var od_ref = db.ref("devices").child(config.device_id).child("ondemand");
- var proc;
- function process_queue() {
- if (working) return;
- working = true;
- var job = queue.shift();
- if(!job) {
- console.log("nothing left to do");
- working = false;
- return;
- }
- var key = job.key;
- if (job.status == "done" || job.status === "no_index") {
- setTimeout(() => {
- working = false;
- process_queue();
- }, 1000);
- return;
- }
- od_ref.child(key).update({
- status: "in_progress",
- startedRecognizing: moment().unix()
- });
- var params = [
- path.join(config.installDir,"fourier/process_timespan_nodb.py"),
- config.device_id,
- job.station,
- job.fromTime,
- job.toTime,
- job.ad,
- key,
- ];
- if (job.ads) params.push(job.ads.join(","));
- console.log("ondemand: " + params.join(" "));
- proc = spawn("python", params);
- proc.stdout.on("data", data => {
- console.log("ondemand data: " + data);
- });
- proc.stderr.on("data", data => {
- console.error("ondemand error: " + data);
- });
- proc.on("exit", code => {
- working = false;
- proc = null;
- if (code === 0) {
- od_ref.child(key).update({
- status: "done",
- endedRecognizing: moment().unix()
- });
- console.log("done", key);
- } else {
- console.error(`wrong process code: ${code}`)
- }
- process_queue();
- });
- }
- function addAndProc(key, job) {
- //queue[key] = job;
- queue.push(job);
- job.key = key;
- process_queue();
- }
- function start() {
- queue = [];
- od_ref.orderByChild('fromTime').on("child_added", snp => {
- console.log("added", snp.key);
- addAndProc(snp.key, snp.val());
- });
- }
- let endpoint = express();
- endpoint.get('/info', function(request, response) {
- response.send(JSON.stringify({
- 'version': version,
- 'processID': process.id,
- 'memoryUsage': process.memoryUsage().rss,
- 'queueSize': queue.length
- }));
- });
- endpoint.listen(49223);
- function stop() {
- od_ref.off();
- queue = {};
- }
- module.exports = {
- start: start,
- stop: stop,
- };
|