123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- const config = require("fourier-common").config;
- const db = require("fourier-common").fbconn.database();
- const spawn = require("child_process").spawn;
- const path = require("path");
- var working = false;
- var queue = {};
- var od_ref = db.ref("devices").child(config.device_id).child("ondemand");
- var proc;
- function process_queue() {
- if (working) return;
- working = true;
- //var key = Object.keys(queue)[0];
- //if (!key) return;
- //var job = queue[key];
- //delete queue[key];
- var job = queue.shift();
- if(!job) {
- console.log("nothing left to do");
- working = false;
- return;
- }
- var key = job.key;
- if (job.status == "done" || job.status === "no_index") {
- setTimeout(() => {
- working = false;
- process_queue();
- }, 1000);
- return;
- }
- od_ref.child(key).update({status: "in_progress"});
- var params = [
- path.join(config.installDir,"fourier/process_timespan_nodb.py"),
- config.device_id,
- job.station,
- job.fromTime,
- job.toTime,
- job.ad,
- key,
- ];
- if (job.ads) params.push(job.ads.join(","));
- console.log("ondemand: " + params.join(" "));
- proc = spawn("python", params);
- proc.stdout.on("data", data => {
- console.log("ondemand data: " + data);
- });
- proc.stderr.on("data", data => {
- console.log("ondemand error: " + data);
- });
- proc.on("exit", code => {
- working = false;
- proc = null;
- if (code == 2) {
- od_ref.child(key).update({status: "no_index"});
- } else {
- od_ref.child(key).update({status: "done"});
- }
- console.log("done", key);
- process_queue();
- });
- }
- function addAndProc(key, job) {
- //queue[key] = job;
- queue.push(job);
- job.key = key;
- process_queue();
- }
- function start() {
- queue = [];
- od_ref.on("child_added", snp => {
- console.log("added", snp.key);
- addAndProc(snp.key, snp.val());
- });
- //od_ref.on("child_changed", snp => {
- // addAndProc(snp.key, snp.val());
- //});
- }
- function stop() {
- od_ref.off();
- queue = {};
- }
- module.exports = {
- start: start,
- stop: stop,
- };
|