ondemand.js 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. const config = require("fourier-common").config;
  2. const db = require("fourier-common").fbconn.database();
  3. const spawn = require("child_process").spawn;
  4. const path = require("path");
  5. var working = false;
  6. var queue = {};
  7. var od_ref = db.ref("devices").child(config.device_id).child("ondemand");
  8. var proc;
  9. function process_queue() {
  10. if (working) return;
  11. working = true;
  12. //var key = Object.keys(queue)[0];
  13. //if (!key) return;
  14. //var job = queue[key];
  15. //delete queue[key];
  16. var job = queue.shift();
  17. if(!job) {
  18. console.log("nothing left to do");
  19. working = false;
  20. return;
  21. }
  22. var key = job.key;
  23. if (job.status == "done" || job.status === "no_index") {
  24. setTimeout(() => {
  25. working = false;
  26. process_queue();
  27. }, 1000);
  28. return;
  29. }
  30. od_ref.child(key).update({status: "in_progress"});
  31. var params = [
  32. path.join(config.installDir,"fourier/process_timespan_nodb.py"),
  33. config.device_id,
  34. job.station,
  35. job.fromTime,
  36. job.toTime,
  37. job.ad,
  38. key,
  39. ];
  40. if (job.ads) params.push(job.ads.join(","));
  41. console.log("ondemand: " + params.join(" "));
  42. proc = spawn("python", params);
  43. proc.stdout.on("data", data => {
  44. console.log("ondemand data: " + data);
  45. });
  46. proc.stderr.on("data", data => {
  47. console.log("ondemand error: " + data);
  48. });
  49. proc.on("exit", code => {
  50. working = false;
  51. proc = null;
  52. if (code == 2) {
  53. od_ref.child(key).update({status: "no_index"});
  54. } else {
  55. od_ref.child(key).update({status: "done"});
  56. }
  57. console.log("done", key);
  58. process_queue();
  59. });
  60. }
  61. function addAndProc(key, job) {
  62. //queue[key] = job;
  63. queue.push(job);
  64. job.key = key;
  65. process_queue();
  66. }
  67. function start() {
  68. queue = [];
  69. od_ref.on("child_added", snp => {
  70. console.log("added", snp.key);
  71. addAndProc(snp.key, snp.val());
  72. });
  73. //od_ref.on("child_changed", snp => {
  74. // addAndProc(snp.key, snp.val());
  75. //});
  76. }
  77. function stop() {
  78. od_ref.off();
  79. queue = {};
  80. }
  81. module.exports = {
  82. start: start,
  83. stop: stop,
  84. };