ondemand.js 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. const config = require("fourier-common").config;
  2. const db = require("fourier-common").fbconn.database();
  3. const spawn = require("child_process").spawn;
  4. const path = require("path");
  5. const express = require('express');
  6. const version = require('./package.json').version;
  7. var working = false;
  8. var queue = {};
  9. var od_ref = db.ref("devices").child(config.device_id).child("ondemand");
  10. var proc;
  11. function process_queue() {
  12. if (working) return;
  13. working = true;
  14. //var key = Object.keys(queue)[0];
  15. //if (!key) return;
  16. //var job = queue[key];
  17. //delete queue[key];
  18. var job = queue.shift();
  19. if(!job) {
  20. console.log("nothing left to do");
  21. working = false;
  22. return;
  23. }
  24. var key = job.key;
  25. if (job.status == "done" || job.status === "no_index") {
  26. setTimeout(() => {
  27. working = false;
  28. process_queue();
  29. }, 1000);
  30. return;
  31. }
  32. od_ref.child(key).update({status: "in_progress"});
  33. var params = [
  34. path.join(config.installDir,"fourier/process_timespan_nodb.py"),
  35. config.device_id,
  36. job.station,
  37. job.fromTime,
  38. job.toTime,
  39. job.ad,
  40. key,
  41. ];
  42. if (job.ads) params.push(job.ads.join(","));
  43. console.log("ondemand: " + params.join(" "));
  44. proc = spawn("python", params);
  45. proc.stdout.on("data", data => {
  46. console.log("ondemand data: " + data);
  47. });
  48. proc.stderr.on("data", data => {
  49. console.log("ondemand error: " + data);
  50. });
  51. proc.on("exit", code => {
  52. working = false;
  53. proc = null;
  54. if (code == 2) {
  55. od_ref.child(key).update({status: "no_index"});
  56. } else {
  57. od_ref.child(key).update({status: "done"});
  58. }
  59. console.log("done", key);
  60. process_queue();
  61. });
  62. }
  63. function addAndProc(key, job) {
  64. //queue[key] = job;
  65. queue.push(job);
  66. job.key = key;
  67. process_queue();
  68. }
  69. function start() {
  70. queue = [];
  71. od_ref.on("child_added", snp => {
  72. console.log("added", snp.key);
  73. addAndProc(snp.key, snp.val());
  74. });
  75. //od_ref.on("child_changed", snp => {
  76. // addAndProc(snp.key, snp.val());
  77. //});
  78. }
  79. let endpoint = express();
  80. endpoint.get('/info', function(request, response) {
  81. response.send(JSON.stringify({
  82. 'version': version,
  83. 'processID': process.id,
  84. 'memoryUsage': process.memoryUsage().rss
  85. }));
  86. });
  87. endpoint.listen(49223);
  88. function stop() {
  89. od_ref.off();
  90. queue = {};
  91. }
  92. module.exports = {
  93. start: start,
  94. stop: stop,
  95. };