ondemand.js 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. const config = require("fourier-common").config;
  2. const db = require("fourier-common").fbconn.database();
  3. const spawn = require("child_process").spawn;
  4. const path = require("path");
  5. const express = require('express');
  6. const version = require('./package.json').version;
  7. const moment = require('moment');
  8. require('console-stamp')(console, 'HH:MM:ss.l');
  9. var working = false;
  10. var queue = {};
  11. var od_ref = db.ref("devices").child(config.device_id).child("ondemand");
  12. var proc;
  13. function process_queue() {
  14. if (working) return;
  15. working = true;
  16. //var key = Object.keys(queue)[0];
  17. //if (!key) return;
  18. //var job = queue[key];
  19. //delete queue[key];
  20. var job = queue.shift();
  21. if(!job) {
  22. console.log("nothing left to do");
  23. working = false;
  24. return;
  25. }
  26. var key = job.key;
  27. if (job.status == "done" || job.status === "no_index") {
  28. setTimeout(() => {
  29. working = false;
  30. process_queue();
  31. }, 1000);
  32. return;
  33. }
  34. od_ref.child(key).update({
  35. status: "in_progress",
  36. startedRecognizing: moment().unix()
  37. });
  38. var params = [
  39. "-u",
  40. path.join(config.installDir,"fourier/process_timespan_nodb.py"),
  41. config.device_id,
  42. job.station,
  43. job.fromTime,
  44. job.toTime,
  45. job.ad,
  46. key,
  47. ];
  48. if (job.ads) params.push(job.ads.join(","));
  49. console.log("ondemand: " + params.join(" "));
  50. proc = spawn("python", params);
  51. proc.stdout.on("data", data => {
  52. console.log("ondemand data: " + data);
  53. });
  54. proc.stderr.on("data", data => {
  55. console.error("ondemand error: " + data);
  56. });
  57. proc.on("exit", code => {
  58. working = false;
  59. proc = null;
  60. if (code === 0) {
  61. od_ref.child(key).update({
  62. status: "done",
  63. endedRecognizing: moment().unix()
  64. });
  65. console.log("done", key);
  66. } else {
  67. console.error(`wrong process code: ${code}`)
  68. }
  69. process_queue();
  70. });
  71. }
  72. function addAndProc(key, job) {
  73. //queue[key] = job;
  74. queue.push(job);
  75. job.key = key;
  76. process_queue();
  77. }
  78. function start() {
  79. queue = [];
  80. od_ref.orderByChild('fromTime').on("child_added", snp => {
  81. console.log("added", snp.key);
  82. addAndProc(snp.key, snp.val());
  83. });
  84. //od_ref.on("child_changed", snp => {
  85. // addAndProc(snp.key, snp.val());
  86. //});
  87. }
  88. let endpoint = express();
  89. endpoint.get('/info', function(request, response) {
  90. response.send(JSON.stringify({
  91. 'version': version,
  92. 'processID': process.id,
  93. 'memoryUsage': process.memoryUsage().rss,
  94. 'queueSize': queue.length
  95. }));
  96. });
  97. endpoint.listen(49223);
  98. function stop() {
  99. od_ref.off();
  100. queue = {};
  101. }
  102. module.exports = {
  103. start: start,
  104. stop: stop,
  105. };