ondemand.js 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. const config = require("fourier-common").config;
  2. const db = require("fourier-common").fbconn.database();
  3. const spawn = require("child_process").spawn;
  4. const path = require("path");
  5. const express = require('express');
  6. const version = require('./package.json').version;
  7. require('console-stamp')(console, 'HH:MM:ss.l');
  8. var working = false;
  9. var queue = {};
  10. var od_ref = db.ref("devices").child(config.device_id).child("ondemand");
  11. var proc;
  12. function process_queue() {
  13. if (working) return;
  14. working = true;
  15. //var key = Object.keys(queue)[0];
  16. //if (!key) return;
  17. //var job = queue[key];
  18. //delete queue[key];
  19. var job = queue.shift();
  20. if(!job) {
  21. console.log("nothing left to do");
  22. working = false;
  23. return;
  24. }
  25. var key = job.key;
  26. if (job.status == "done" || job.status === "no_index") {
  27. setTimeout(() => {
  28. working = false;
  29. process_queue();
  30. }, 1000);
  31. return;
  32. }
  33. od_ref.child(key).update({status: "in_progress"});
  34. var params = [
  35. path.join(config.installDir,"fourier/process_timespan_nodb.py"),
  36. config.device_id,
  37. job.station,
  38. job.fromTime,
  39. job.toTime,
  40. job.ad,
  41. key,
  42. ];
  43. if (job.ads) params.push(job.ads.join(","));
  44. console.log("ondemand: " + params.join(" "));
  45. proc = spawn("python", params);
  46. proc.stdout.on("data", data => {
  47. console.log("ondemand data: " + data);
  48. });
  49. proc.stderr.on("data", data => {
  50. console.log("ondemand error: " + data);
  51. });
  52. proc.on("exit", code => {
  53. working = false;
  54. proc = null;
  55. if (code === 0) {
  56. od_ref.child(key).update({status: "done"});
  57. console.log("done", key);
  58. } else {
  59. console.error(`wrong process code: ${code}`)
  60. }
  61. process_queue();
  62. });
  63. }
  64. function addAndProc(key, job) {
  65. //queue[key] = job;
  66. queue.push(job);
  67. job.key = key;
  68. process_queue();
  69. }
  70. function start() {
  71. queue = [];
  72. od_ref.orderByChild('fromTime').on("child_added", snp => {
  73. console.log("added", snp.key);
  74. addAndProc(snp.key, snp.val());
  75. });
  76. //od_ref.on("child_changed", snp => {
  77. // addAndProc(snp.key, snp.val());
  78. //});
  79. }
  80. let endpoint = express();
  81. endpoint.get('/info', function(request, response) {
  82. response.send(JSON.stringify({
  83. 'version': version,
  84. 'processID': process.id,
  85. 'memoryUsage': process.memoryUsage().rss,
  86. 'queueSize': queue.length
  87. }));
  88. });
  89. endpoint.listen(49223);
  90. function stop() {
  91. od_ref.off();
  92. queue = {};
  93. }
  94. module.exports = {
  95. start: start,
  96. stop: stop,
  97. };