const config = require("fourier-common").config; const db = require("fourier-common").fbconn.database(); const spawn = require("child_process").spawn; const path = require("path"); const express = require('express'); var working = false; var queue = {}; var od_ref = db.ref("devices").child(config.device_id).child("ondemand"); var proc; function process_queue() { if (working) return; working = true; //var key = Object.keys(queue)[0]; //if (!key) return; //var job = queue[key]; //delete queue[key]; var job = queue.shift(); if(!job) { console.log("nothing left to do"); working = false; return; } var key = job.key; if (job.status == "done" || job.status === "no_index") { setTimeout(() => { working = false; process_queue(); }, 1000); return; } od_ref.child(key).update({status: "in_progress"}); var params = [ path.join(config.installDir,"fourier/process_timespan_nodb.py"), config.device_id, job.station, job.fromTime, job.toTime, job.ad, key, ]; if (job.ads) params.push(job.ads.join(",")); console.log("ondemand: " + params.join(" ")); proc = spawn("python", params); proc.stdout.on("data", data => { console.log("ondemand data: " + data); }); proc.stderr.on("data", data => { console.log("ondemand error: " + data); }); proc.on("exit", code => { working = false; proc = null; if (code == 2) { od_ref.child(key).update({status: "no_index"}); } else { od_ref.child(key).update({status: "done"}); } console.log("done", key); process_queue(); }); } function addAndProc(key, job) { //queue[key] = job; queue.push(job); job.key = key; process_queue(); } function start() { queue = []; od_ref.on("child_added", snp => { console.log("added", snp.key); addAndProc(snp.key, snp.val()); }); //od_ref.on("child_changed", snp => { // addAndProc(snp.key, snp.val()); //}); } endpoint.get('/info', function(request, response) { response.send(JSON.stringify({ 'version': version, 'processID': process.id, 'memoryUsage': process.memoryUsage().rss })); }); endpoint.listen(49223); function stop() { od_ref.off(); queue = {}; } module.exports = { start: start, stop: stop, };