Skip to content

Commit

Permalink
Merge pull request #254 from skadefro/master
Browse files Browse the repository at this point in the history
Close 1.4.29
  • Loading branch information
skadefro authored Oct 10, 2022
2 parents cc1b651 + 532da3f commit d17af51
Show file tree
Hide file tree
Showing 16 changed files with 796 additions and 56 deletions.
4 changes: 4 additions & 0 deletions OpenFlow/src/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export class dbConfig extends Base {
public otel_trace_connection_ips: boolean;
public otel_trace_mongodb_per_users: boolean;
public otel_trace_mongodb_query_per_users: boolean;
public otel_trace_mongodb_count_per_users: boolean;
public otel_trace_mongodb_aggregate_per_users: boolean;
public otel_trace_mongodb_insert_per_users: boolean;
public otel_trace_mongodb_update_per_users: boolean;
Expand Down Expand Up @@ -139,6 +140,7 @@ export class dbConfig extends Base {
Config.otel_trace_connection_ips = Config.parseBoolean(conf.otel_trace_connection_ips ? conf.otel_trace_connection_ips : Config.getEnv("otel_trace_connection_ips", "false"));
Config.otel_trace_mongodb_per_users = Config.parseBoolean(conf.otel_trace_mongodb_per_users ? conf.otel_trace_mongodb_per_users : Config.getEnv("otel_trace_mongodb_per_users", "false"));
Config.otel_trace_mongodb_query_per_users = Config.parseBoolean(conf.otel_trace_mongodb_query_per_users ? conf.otel_trace_mongodb_query_per_users : Config.getEnv("otel_trace_mongodb_query_per_users", "false"));
Config.otel_trace_mongodb_count_per_users = Config.parseBoolean(conf.otel_trace_mongodb_count_per_users ? conf.otel_trace_mongodb_count_per_users : Config.getEnv("otel_trace_mongodb_query_per_users", "false"));
Config.otel_trace_mongodb_aggregate_per_users = Config.parseBoolean(conf.otel_trace_mongodb_aggregate_per_users ? conf.otel_trace_mongodb_aggregate_per_users : Config.getEnv("otel_trace_mongodb_aggregate_per_users", "false"));
Config.otel_trace_mongodb_insert_per_users = Config.parseBoolean(conf.otel_trace_mongodb_insert_per_users ? conf.otel_trace_mongodb_insert_per_users : Config.getEnv("otel_trace_mongodb_insert_per_users", "false"));
Config.otel_trace_mongodb_update_per_users = Config.parseBoolean(conf.otel_trace_mongodb_update_per_users ? conf.otel_trace_mongodb_update_per_users : Config.getEnv("otel_trace_mongodb_update_per_users", "false"));
Expand Down Expand Up @@ -366,6 +368,7 @@ export class Config {
Config.otel_trace_connection_ips = Config.parseBoolean(Config.getEnv("otel_trace_connection_ips", "false"));
Config.otel_trace_mongodb_per_users = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_per_users", "false"));
Config.otel_trace_mongodb_query_per_users = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_query_per_users", "false"));
Config.otel_trace_mongodb_count_per_users = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_count_per_users", "false"));
Config.otel_trace_mongodb_aggregate_per_users = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_aggregate_per_users", "false"));
Config.otel_trace_mongodb_insert_per_users = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_insert_per_users", "false"));
Config.otel_trace_mongodb_update_per_users = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_update_per_users", "false"));
Expand Down Expand Up @@ -576,6 +579,7 @@ export class Config {
public static otel_trace_connection_ips: boolean = Config.parseBoolean(Config.getEnv("otel_trace_connection_ips", "false"));
public static otel_trace_mongodb_per_users: boolean = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_per_users", "false"));
public static otel_trace_mongodb_query_per_users: boolean = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_query_per_users", "false"));
public static otel_trace_mongodb_count_per_users: boolean = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_count_per_users", "false"));
public static otel_trace_mongodb_aggregate_per_users: boolean = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_aggregate_per_users", "false"));
public static otel_trace_mongodb_insert_per_users: boolean = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_insert_per_users", "false"));
public static otel_trace_mongodb_update_per_users: boolean = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_update_per_users", "false"));
Expand Down
219 changes: 184 additions & 35 deletions OpenFlow/src/DatabaseConnection.ts

Large diffs are not rendered by default.

34 changes: 33 additions & 1 deletion OpenFlow/src/Logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { i_license_file, i_nodered_driver, i_otel } from "./commoninterfaces";
import { Config } from "./Config";
import { dockerdriver } from "./dockerdriver";
import { DBHelper } from './DBHelper';
const fs = require('fs');
const path = require('path');

const MAX_RETRIES_DEFAULT = 5
Expand Down Expand Up @@ -160,6 +161,37 @@ export class Logger {
if (Config.otel_warn_log) Logger.enabled["WebSocketServerClient"] = level.Warning;
if (Config.otel_err_log) Logger.enabled["WebSocketServerClient"] = level.Error;
}
static hasDockerEnv(): boolean {
try {
fs.statSync('/.dockerenv');
return true;
} catch (_) {
return false;
}
}
static hasDockerCGroup() {
try {
if (fs.readFileSync('/proc/self/cgroup', 'utf8').includes('docker')) return true;
return fs.readFileSync('/proc/self/cgroup', 'utf8').includes('/kubepods');
} catch (_) {
return false;
}
}
private static _isDocker: boolean = null;
public static isDocker(): boolean {
if (Logger._isDocker != null) return Logger._isDocker;
Logger._isDocker = Logger.hasDockerEnv() || Logger.hasDockerCGroup();
return false;
}
private static _isKubernetes: boolean = null;
public static isKubernetes(): boolean {
if (Logger._isKubernetes != null) return Logger._isKubernetes;
if (!Logger.isDocker()) { Logger._isKubernetes = false; return false; }
if (NoderedUtil.IsNullEmpty(process.env["KUBERNETES_SERVICE_HOST"])) { Logger._isKubernetes = false; return false; }
Logger._isKubernetes = true;
return true;
}

static configure(skipotel: boolean, skiplic: boolean): void {
Logger.DBHelper = new DBHelper();
Logger.reload()
Expand Down Expand Up @@ -197,7 +229,7 @@ export class Logger {
}

this.nodereddriver = null;
if (!NoderedUtil.isKubernetes() && NoderedUtil.isDocker()) {
if (!Logger.isKubernetes() && Logger.isDocker()) {
if (NoderedUtil.IsNullEmpty(process.env["KUBERNETES_SERVICE_HOST"])) {
try {
this.nodereddriver = new dockerdriver();
Expand Down
55 changes: 48 additions & 7 deletions OpenFlow/src/Messages/Message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { Readable, Stream } from "stream";
import { GridFSBucket, ObjectId, Binary, FindCursor, GridFSFile, Filter } from "mongodb";
import * as path from "path";
import { DatabaseConnection } from "../DatabaseConnection";
import { StripeMessage, NoderedUtil, QueuedMessage, RegisterQueueMessage, QueueMessage, CloseQueueMessage, ListCollectionsMessage, DropCollectionMessage, QueryMessage, AggregateMessage, InsertOneMessage, UpdateOneMessage, Base, UpdateManyMessage, InsertOrUpdateOneMessage, DeleteOneMessage, MapReduceMessage, SigninMessage, TokenUser, User, Rights, EnsureNoderedInstanceMessage, DeleteNoderedInstanceMessage, DeleteNoderedPodMessage, RestartNoderedInstanceMessage, GetNoderedInstanceMessage, GetNoderedInstanceLogMessage, SaveFileMessage, WellknownIds, GetFileMessage, UpdateFileMessage, NoderedUser, WatchMessage, GetDocumentVersionMessage, DeleteManyMessage, InsertManyMessage, RegisterExchangeMessage, EnsureCustomerMessage, Customer, stripe_tax_id, Role, SelectCustomerMessage, Rolemember, ResourceUsage, Resource, ResourceVariant, stripe_subscription, GetNextInvoiceMessage, stripe_invoice, stripe_price, stripe_plan, stripe_invoice_line, GetKubeNodeLabelsMessage, CreateWorkflowInstanceMessage, WorkitemFile, InsertOrUpdateManyMessage, Ace, stripe_base } from "@openiap/openflow-api";
import { StripeMessage, NoderedUtil, QueuedMessage, RegisterQueueMessage, QueueMessage, CloseQueueMessage, ListCollectionsMessage, DropCollectionMessage, QueryMessage, AggregateMessage, InsertOneMessage, UpdateOneMessage, Base, UpdateManyMessage, InsertOrUpdateOneMessage, DeleteOneMessage, MapReduceMessage, SigninMessage, TokenUser, User, Rights, EnsureNoderedInstanceMessage, DeleteNoderedInstanceMessage, DeleteNoderedPodMessage, RestartNoderedInstanceMessage, GetNoderedInstanceMessage, GetNoderedInstanceLogMessage, SaveFileMessage, WellknownIds, GetFileMessage, UpdateFileMessage, NoderedUser, WatchMessage, GetDocumentVersionMessage, DeleteManyMessage, InsertManyMessage, RegisterExchangeMessage, EnsureCustomerMessage, Customer, stripe_tax_id, Role, SelectCustomerMessage, Rolemember, ResourceUsage, Resource, ResourceVariant, stripe_subscription, GetNextInvoiceMessage, stripe_invoice, stripe_price, stripe_plan, stripe_invoice_line, GetKubeNodeLabelsMessage, CreateWorkflowInstanceMessage, WorkitemFile, InsertOrUpdateManyMessage, Ace, stripe_base, CountMessage } from "@openiap/openflow-api";
import { stripe_customer, stripe_list, StripeAddPlanMessage, StripeCancelPlanMessage, stripe_subscription_item, stripe_coupon } from "@openiap/openflow-api";
import { amqpwrapper, QueueMessageOptions } from "../amqpwrapper";
import { WebSocketServerClient } from "../WebSocketServerClient";
Expand Down Expand Up @@ -79,6 +79,9 @@ export class Message {
case "query":
await this.Query(span);
break;
case "count":
await this.Count(span);
break;
case "getdocumentversion":
await this.GetDocumentVersion(span);
break;
Expand Down Expand Up @@ -310,6 +313,14 @@ export class Message {
cli.Send(this);
}
break;
case "count":
if (Config.enable_openflow_amqp) {
cli.Send(await QueueClient.SendForProcessing(this, this.priority));
} else {
await this.Count(span);
cli.Send(this);
}
break;
case "getdocumentversion":
if (Config.enable_openflow_amqp) {
cli.Send(await QueueClient.SendForProcessing(this, this.priority));
Expand Down Expand Up @@ -1118,6 +1129,36 @@ export class Message {
}
Logger.otel.endSpan(span);
}
private async Count(parent: Span): Promise<void> {
const span: Span = Logger.otel.startSubSpan("message.Count", parent);
this.Reply();
let msg: CountMessage
try {
msg = CountMessage.assign(this.data);
if (NoderedUtil.IsNullEmpty(msg.jwt)) { msg.jwt = this.jwt; }
if (NoderedUtil.IsNullEmpty(msg.jwt)) {
span?.recordException("Access denied, not signed in")
msg.error = "Access denied, not signed in";
} else {
const { query, collectionname, jwt, queryas } = msg;
msg.result = await Config.db.count({ query, collectionname, jwt, queryas }, span);
}
delete msg.query;
} catch (error) {
await handleError(null, error);
span?.recordException(error)
if (NoderedUtil.IsNullUndefinded(msg)) { (msg as any) = {}; }
if (msg !== null && msg !== undefined) msg.error = error.message ? error.message : error;
}
try {
this.data = JSON.stringify(msg);
} catch (error) {
this.data = "";
span?.recordException(error)
await handleError(null, error);
}
Logger.otel.endSpan(span);
}
private async GetDocumentVersion(parent: Span): Promise<void> {
const span: Span = Logger.otel.startSubSpan("message.GetDocumentVersion", parent);
this.Reply();
Expand Down Expand Up @@ -1433,7 +1474,7 @@ export class Message {
try {
msg = DeleteManyMessage.assign(this.data);
if (NoderedUtil.IsNullEmpty(msg.jwt)) { msg.jwt = this.jwt; }
msg.affectedrows = await Config.db.DeleteMany(msg.query, msg.ids, msg.collectionname, null, msg.jwt, span);
msg.affectedrows = await Config.db.DeleteMany(msg.query, msg.ids, msg.collectionname, null, msg.recursive, msg.jwt, span);
delete msg.ids;
} catch (error) {
if (NoderedUtil.IsNullUndefinded(msg)) { (msg as any) = {}; }
Expand Down Expand Up @@ -5161,15 +5202,15 @@ export class Message {
msg.result = await Config.db._UpdateOne(null, wiq as any, "mq", 1, true, jwt, parent);

if (msg.purge) {
await Config.db.DeleteMany({ "_type": "workitem", "wiqid": wiq._id }, null, "workitems", null, jwt, parent);
await Config.db.DeleteMany({ "_type": "workitem", "wiqid": wiq._id }, null, "workitems", null, false, jwt, parent);
var items = await Config.db.query<WorkitemQueue>({ query: { "_type": "workitem", "wiqid": wiq._id }, collectionname: "workitems", top: 1, jwt }, parent);
if (items.length > 0) {
}
items = await Config.db.query<WorkitemQueue>({ query: { "_type": "workitem", "wiqid": wiq._id }, collectionname: "workitems", top: 1, jwt }, parent);
if (items.length > 0) {
throw new Error("Failed purging workitemqueue " + wiq.name);
}
await Config.db.DeleteMany({ "metadata.wiqid": wiq._id }, null, "fs.files", null, jwt, parent);
await Config.db.DeleteMany({ "metadata.wiqid": wiq._id }, null, "fs.files", null, false, jwt, parent);
}
} catch (error) {
await handleError(null, error);
Expand Down Expand Up @@ -5208,15 +5249,15 @@ export class Message {
user = this.tuser;

if (msg.purge) {
await Config.db.DeleteMany({ "_type": "workitem", "wiqid": wiq._id }, null, "workitems", null, jwt, parent);
await Config.db.DeleteMany({ "_type": "workitem", "wiqid": wiq._id }, null, "workitems", null, false, jwt, parent);
var items = await Config.db.query<WorkitemQueue>({ query: { "_type": "workitem", "wiqid": wiq._id }, collectionname: "workitems", top: 1, jwt }, parent);
if (items.length > 0) {
items = await Config.db.query<WorkitemQueue>({ query: { "_type": "workitem", "wiqid": wiq._id }, collectionname: "workitems", top: 1, jwt }, parent);
}
if (items.length > 0) {
throw new Error("Failed purging workitemqueue " + wiq.name);
}
await Config.db.DeleteMany({ "metadata.wiqid": wiq._id }, null, "fs.files", null, jwt, parent);
await Config.db.DeleteMany({ "metadata.wiqid": wiq._id }, null, "fs.files", null, false, jwt, parent);
} else {
var items = await Config.db.query<WorkitemQueue>({ query: { "_type": "workitem", "wiqid": wiq._id }, collectionname: "workitems", top: 1, jwt }, parent);
if (items.length > 0) {
Expand Down Expand Up @@ -5255,7 +5296,7 @@ export class Message {
switch (msg.command) {
case "dumpwebsocketclients":
if (!this.tuser.HasRoleId(WellknownIds.admins)) throw new Error("Access denied");
await Config.db.DeleteMany({ "_type": "websocketclient" }, null, "websocketclients", null, jwt, parent);
await Config.db.DeleteMany({ "_type": "websocketclient" }, null, "websocketclients", null, false, jwt, parent);
amqpwrapper.Instance().send("openflow", "", { "command": "dumpwebsocketclients" }, 10000, null, "", 1);
break;
case "killwebsocketclient":
Expand Down
2 changes: 1 addition & 1 deletion OpenFlow/src/OAuthProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ export class Account {
return token.item;
}
static async RemoveTokenRequest(code: string, parent: Span) {
let tokens = await Config.db.DeleteMany({ _type: "tokenrequest", "code": code }, null, "oauthtokens", null, Crypt.rootToken(), parent);
let tokens = await Config.db.DeleteMany({ _type: "tokenrequest", "code": code }, null, "oauthtokens", null, false, Crypt.rootToken(), parent);
return tokens[0];
}
}
5 changes: 3 additions & 2 deletions OpenFlowNodeRED/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openiap/nodered",
"version": "1.4.28",
"version": "1.4.29",
"description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)",
"main": "index.js",
"scripts": {
Expand All @@ -25,7 +25,7 @@
},
"dependencies": {
"@nodemailer/mailparser2": "^1.0.3",
"@openiap/openflow-api": "^2.0.20",
"@openiap/openflow-api": "^2.1.2",
"@opentelemetry/api-metrics": "^0.32.0",
"@opentelemetry/exporter-metrics-otlp-grpc": "^0.32.0",
"@opentelemetry/exporter-trace-otlp-grpc": "^0.32.0",
Expand All @@ -46,6 +46,7 @@
"passport-saml": "^3.2.1",
"passport-saml-metadata": "^2.6.0",
"pm2": "^5.2.0",
"ramda": "^0.28.0",
"readline-sync": "^1.4.10",
"request": "^2.88.2",
"smtp-server": "^3.11.0"
Expand Down
2 changes: 2 additions & 0 deletions OpenFlowNodeRED/src/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export class Config {
public static reload(): void {
Config.getversion();
Config.logpath = Config.getEnv("logpath", __dirname);
Config.enable_file_cache = Config.parseBoolean(Config.getEnv("log_with_colors", "true"));
Config.log_with_colors = Config.parseBoolean(Config.getEnv("log_with_colors", "true"));
Config.log_with_trace = Config.parseBoolean(Config.getEnv("log_with_trace", "false"));

Expand Down Expand Up @@ -99,6 +100,7 @@ export class Config {
public static version: string = Config.getversion();
public static unittesting: boolean = false;
public static logpath: string = Config.getEnv("logpath", __dirname);
public static enable_file_cache: boolean = Config.parseBoolean(Config.getEnv("log_with_colors", "true"));
public static log_with_trace: boolean = Config.parseBoolean(Config.getEnv("log_with_trace", "false"));
public static log_with_colors: boolean = Config.parseBoolean(Config.getEnv("log_with_colors", "true"));
public static log_information: boolean = Config.parseBoolean(Config.getEnv("log_information", "true"));
Expand Down
Loading

0 comments on commit d17af51

Please sign in to comment.