From 3a633ecf79a8bb124cb3aa2b1f0584225a3bac5f Mon Sep 17 00:00:00 2001 From: Allan Zimmermann Date: Fri, 7 Oct 2022 13:20:17 +0200 Subject: [PATCH 1/4] Clear user from cache on delete --- OpenFlow/src/DatabaseConnection.ts | 83 +++++++++++++++++++++--------- OpenFlowNodeRED/package.json | 2 +- VERSION | 2 +- package.json | 2 +- 4 files changed, 62 insertions(+), 27 deletions(-) diff --git a/OpenFlow/src/DatabaseConnection.ts b/OpenFlow/src/DatabaseConnection.ts index 3c9d4cda..d47d81ca 100644 --- a/OpenFlow/src/DatabaseConnection.ts +++ b/OpenFlow/src/DatabaseConnection.ts @@ -322,6 +322,7 @@ export class DatabaseConnection extends events.EventEmitter { } registerGlobalWatch(collectionname: string, parent: Span) { if (!this.registerGlobalWatches) return; + if (collectionname == "cvr") return; const span: Span = Logger.otel.startSubSpan("registerGlobalWatch", parent); try { span?.setAttribute("collectionname", collectionname); @@ -354,6 +355,15 @@ export class DatabaseConnection extends events.EventEmitter { if (collectionname == "config" && NoderedUtil.IsNullUndefinded(item)) { item = await this.GetLatestDocumentVersion({ collectionname, id: _id, jwt: Crypt.rootToken() }, null); } + // if (next.operationType == 'delete' && collectionname == "users") { + // item = await this.GetLatestDocumentVersion({ collectionname, id: _id, jwt: Crypt.rootToken() }, null); + // if (!NoderedUtil.IsNullUndefinded(item)) { + // if (!NoderedUtil.IsNullEmpty(item.username)) await Logger.DBHelper.memoryCache.del("username_" + item.username); + // await Logger.DBHelper.memoryCache.del("users" + _id); + // await Logger.DBHelper.memoryCache.del("userroles_" + _id); + // if (item._type == "role") await Logger.DBHelper.memoryCache.del("rolename_" + item.username); + // } + // } if (!NoderedUtil.IsNullUndefinded(item)) { _type = item._type; @@ -1662,11 +1672,12 @@ export class DatabaseConnection extends events.EventEmitter { if (item._type === "role") { const r: Role = (item as any); if (r.members.length > 0) { - if (Config.enable_openflow_amqp && !Config.supports_watch) { - amqpwrapper.Instance().send("openflow", "", { "command": "clearcache" }, 20000, null, "", 1); - } else if (!Config.supports_watch) { + if (Config.cache_store_type == "redis" || Config.cache_store_type == "mongodb") { + // we clear all since we might have cached tons of userrole mappings Logger.DBHelper.clearCache("insertone in " + collectionname + " collection for a " + item._type + " object"); - } + } else if (Config.enable_openflow_amqp) { + amqpwrapper.Instance().send("openflow", "", { "command": "clearcache" }, 20000, null, "", 1); + } } } } @@ -1772,11 +1783,12 @@ export class DatabaseConnection extends events.EventEmitter { if (item._type === "role") { const r: Role = item as any; if (r.members.length > 0) { - if (Config.enable_openflow_amqp && !Config.supports_watch) { - amqpwrapper.Instance().send("openflow", "", { "command": "clearcache" }, 20000, null, "", 1); - } else if (!Config.supports_watch) { + if (Config.cache_store_type == "redis" || Config.cache_store_type == "mongodb") { + // we clear all since we might have cached tons of userrole mappings Logger.DBHelper.clearCache("insertmany in " + collectionname + " collection for a " + item._type + " object"); - } + } else if (Config.enable_openflow_amqp) { + amqpwrapper.Instance().send("openflow", "", { "command": "clearcache" }, 20000, null, "", 1); + } } } } @@ -2240,22 +2252,24 @@ export class DatabaseConnection extends events.EventEmitter { // DBHelper.cached_roles = []; } if (q.item._type === "role" && q.collectionname === "users") { - if (Config.enable_openflow_amqp && !Config.supports_watch) { - amqpwrapper.Instance().send("openflow", "", { "command": "clearcache" }, 20000, null, "", 1); - } else if (!Config.supports_watch) { + if (Config.cache_store_type == "redis" || Config.cache_store_type == "mongodb") { Logger.DBHelper.clearCache("updateone in " + q.collectionname + " collection for a " + q.item._type + " object"); - } + } else if (Config.enable_openflow_amqp) { + amqpwrapper.Instance().send("openflow", "", { "command": "clearcache" }, 20000, null, "", 1); + } } if (q.collectionname === "mq") { if (!NoderedUtil.IsNullEmpty(q.item.name)) { if (q.item._type == "exchange") q.item.name = q.item.name.toLowerCase(); if (q.item._type == "queue") q.item.name = q.item.name.toLowerCase(); } - if (Config.enable_openflow_amqp && !Config.supports_watch) { + if (Config.cache_store_type == "redis" || Config.cache_store_type == "mongodb") { + await Logger.DBHelper.memoryCache.del("mq" + q.item._id); + if (q.item._type == "queue") await Logger.DBHelper.memoryCache.del("queuename_" + q.item.name); + if (q.item._type == "exchange") await Logger.DBHelper.memoryCache.del("exchangename_" + q.item.name); + } else if (Config.enable_openflow_amqp) { amqpwrapper.Instance().send("openflow", "", { "command": "clearcache" }, 20000, null, "", 1); - } else if (!Config.supports_watch) { - Logger.DBHelper.clearCache("updateone in " + q.collectionname + " collection for a " + q.item._type + " object"); - } + } } if (!DatabaseConnection.usemetadata(q.collectionname)) { try { @@ -2907,6 +2921,21 @@ export class DatabaseConnection extends events.EventEmitter { for (var r of subdocs) { this.DeleteOne(r._id, "users", false, jwt, span); } + if (Config.cache_store_type == "redis" || Config.cache_store_type == "mongodb") { + // @ts-ignore + if (!NoderedUtil.IsNullEmpty(doc.username)) { + // @ts-ignore + await Logger.DBHelper.memoryCache.del("username_" + doc.username); + // @ts-ignore + await Logger.DBHelper.memoryCache.del("federation_" + doc.username); + } + await Logger.DBHelper.memoryCache.del("users" + doc._id); + await Logger.DBHelper.memoryCache.del("userroles_" + doc._id); + + } else if (Config.enable_openflow_amqp) { + amqpwrapper.Instance().send("openflow", "", { "command": "clearcache" }, 20000, null, "", 1); + } + if (Config.cleanup_on_delete_user || recursive) { let skip_collections = []; if (!NoderedUtil.IsNullEmpty(Config.housekeeping_skip_collections)) skip_collections = Config.housekeeping_skip_collections.split(",") @@ -2944,18 +2973,24 @@ export class DatabaseConnection extends events.EventEmitter { } } if (collectionname == "users" && doc._type == "role") { - if (Config.enable_openflow_amqp && !Config.supports_watch) { + if (Config.cache_store_type == "redis" || Config.cache_store_type == "mongodb") { + // we clear all since we might have cached tons of userrole mappings + Logger.DBHelper.clearCache("deleted role " + doc.name); + // await Logger.DBHelper.memoryCache.del("users" + doc._id); + // await Logger.DBHelper.memoryCache.del("rolename_" + doc.name); + // await Logger.DBHelper.memoryCache.del("allroles"); + } else if (Config.enable_openflow_amqp) { amqpwrapper.Instance().send("openflow", "", { "command": "clearcache" }, 20000, null, "", 1); - } else if (!Config.supports_watch) { - Logger.DBHelper.clearCache("deleteone in " + collectionname + " collection for a " + doc._type + " object"); - } + } } if (collectionname === "mq") { - if (Config.enable_openflow_amqp && !Config.supports_watch) { + if (Config.cache_store_type == "redis" || Config.cache_store_type == "mongodb") { + await Logger.DBHelper.memoryCache.del("mq" + doc._id); + if (doc._type == "queue") await Logger.DBHelper.memoryCache.del("queuename_" + doc.name); + if (doc._type == "exchange") await Logger.DBHelper.memoryCache.del("exchangename_" + doc.name); + } else if (Config.enable_openflow_amqp) { amqpwrapper.Instance().send("openflow", "", { "command": "clearcache" }, 20000, null, "", 1); - } else if (!Config.supports_watch) { - Logger.DBHelper.clearCache("deleteone in " + collectionname + " collection for a " + doc._type + " object"); - } + } } if (collectionname === "config" && doc._type === "provider" && !Config.supports_watch) { await Logger.DBHelper.ClearProviders(); diff --git a/OpenFlowNodeRED/package.json b/OpenFlowNodeRED/package.json index 77d1b120..79642170 100644 --- a/OpenFlowNodeRED/package.json +++ b/OpenFlowNodeRED/package.json @@ -1,6 +1,6 @@ { "name": "@openiap/nodered", - "version": "1.4.28", + "version": "1.4.29", "description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)", "main": "index.js", "scripts": { diff --git a/VERSION b/VERSION index dd1484fe..06e37f38 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.4.28 \ No newline at end of file +1.4.29 \ No newline at end of file diff --git a/package.json b/package.json index b8aaf337..3aaa6910 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@openiap/openflow", - "version": "1.4.28", + "version": "1.4.29", "description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)", "main": "index.js", "scripts": { From aaaaa930f3bca0bbccb5592a25e0d9a482de9a0e Mon Sep 17 00:00:00 2001 From: Allan Zimmermann Date: Sat, 8 Oct 2022 18:30:04 +0200 Subject: [PATCH 2/4] add count --- OpenFlow/src/DatabaseConnection.ts | 125 +++++++++++++++++++++++++++-- OpenFlow/src/Messages/Message.ts | 43 +++++++++- 2 files changed, 159 insertions(+), 9 deletions(-) diff --git a/OpenFlow/src/DatabaseConnection.ts b/OpenFlow/src/DatabaseConnection.ts index d47d81ca..78997679 100644 --- a/OpenFlow/src/DatabaseConnection.ts +++ b/OpenFlow/src/DatabaseConnection.ts @@ -1,7 +1,7 @@ import { MongoClient, ObjectId, Db, Binary, GridFSBucket, ChangeStream, MongoClientOptions, AggregateOptions, InsertOneOptions, InsertOneResult, UpdateOptions } from "mongodb"; import { Crypt } from "./Crypt"; import { Config, dbConfig } from "./Config"; -import { TokenUser, Base, WellknownIds, Rights, NoderedUtil, mapFunc, finalizeFunc, reduceFunc, Ace, UpdateOneMessage, UpdateManyMessage, InsertOrUpdateOneMessage, Role, Rolemember, User, Customer, WatchEventMessage, Workitem, WorkitemQueue, QueryOptions } from "@openiap/openflow-api"; +import { TokenUser, Base, WellknownIds, Rights, NoderedUtil, mapFunc, finalizeFunc, reduceFunc, Ace, UpdateOneMessage, UpdateManyMessage, InsertOrUpdateOneMessage, Role, Rolemember, User, Customer, WatchEventMessage, Workitem, WorkitemQueue, QueryOptions, CountOptions } from "@openiap/openflow-api"; import { OAuthProvider } from "./OAuthProvider"; import { ObservableUpDownCounter, Histogram } from "@opentelemetry/api-metrics" import { Span } from "@opentelemetry/api"; @@ -46,6 +46,7 @@ export class DatabaseConnection extends events.EventEmitter { private _dbname: string; // public static ot_mongodb_query_count: Counter; public static mongodb_query: Histogram; + public static mongodb_count: Histogram; public static mongodb_aggregate: Histogram; public static mongodb_insert: Histogram; public static mongodb_insertmany: Histogram; @@ -73,6 +74,9 @@ export class DatabaseConnection extends events.EventEmitter { DatabaseConnection.mongodb_query = Logger.otel.meter.createHistogram('openflow_mongodb_query_seconds', { description: 'Duration for mongodb queries', valueType: 1, unit: 's' }); + DatabaseConnection.mongodb_count = Logger.otel.meter.createHistogram('openflow_mongodb_count_seconds', { + description: 'Duration for mongodb counts', valueType: 1, unit: 's' + }); // valueType: ValueType.DOUBLE DatabaseConnection.mongodb_aggregate = Logger.otel.meter.createHistogram('openflow_mongodb_aggregate_seconds', { description: 'Duration for mongodb aggregates', valueType: 1, unit: 's' @@ -991,6 +995,99 @@ export class DatabaseConnection extends events.EventEmitter { Logger.otel.endSpan(span); } } + + /** + * Send a query to the database. + * @param {any} query MongoDB Query + * @param {string} collectionname What collection to query + * @param {string} jwt JWT of user who is making the query, to limit results based on permissions + * @returns Promise Array of results + */ + // tslint:disable-next-line: max-line-length + async count(options: CountOptions, parent: Span): Promise { + let { query, collectionname, jwt, queryas } = Object.assign({ + }, options); + const span: Span = Logger.otel.startSubSpan("db.count", parent); + let _query: Object = {}; + try { + await this.connect(span); + if (query !== null && query !== undefined) { + span?.addEvent("parse query"); + let json: any = query; + if (typeof json !== 'string' && !(json instanceof String)) { + json = JSON.stringify(json, (key, value) => { + if (value instanceof RegExp) + return ("__REGEXP " + value.toString()); + else + return value; + }); + } + query = JSON.parse(json, (key, value) => { + if (typeof value === 'string' && value.match(isoDatePattern)) { + return new Date(value); // isostring, so cast to js date + } else if (value != null && value != undefined && value.toString().indexOf("__REGEXP ") === 0) { + const m = value.split("__REGEXP ")[1].match(/\/(.*)\/(.*)?/); + return new RegExp(m[1], m[2] || ""); + } else + return value; // leave any other value as-is + }); + if (Config.otel_trace_include_query) span?.setAttribute("query", JSON.stringify(query)); + } + if (NoderedUtil.IsNullUndefinded(query)) { + throw new Error("Query is mandatory"); + } + const keys: string[] = Object.keys(query); + for (let key of keys) { + if (key === "_id") { + const id: string = query._id; + const safeid = safeObjectID(id); + if (safeid !== null && safeid !== undefined) { + delete query._id; + query.$or = [{ _id: id }, { _id: safeObjectID(id) }]; + } + } + } + span?.addEvent("verityToken"); + const user: TokenUser = await Crypt.verityToken(jwt); + + span?.addEvent("getbasequery"); + if (collectionname === "files") { collectionname = "fs.files"; } + if (DatabaseConnection.usemetadata(collectionname)) { + let impersonationquery; + if (!NoderedUtil.IsNullEmpty(queryas)) impersonationquery = await this.getbasequeryuserid(user, queryas, "metadata._acl", [Rights.read], span); + if (!NoderedUtil.IsNullEmpty(queryas) && !NoderedUtil.IsNullUndefinded(impersonationquery)) { + _query = { $and: [query, this.getbasequery(user, "metadata._acl", [Rights.read]), impersonationquery] }; + } else { + _query = { $and: [query, this.getbasequery(user, "metadata._acl", [Rights.read])] }; + } + } else { + let impersonationquery: any; + if (!NoderedUtil.IsNullEmpty(queryas)) impersonationquery = await this.getbasequeryuserid(user, queryas, "_acl", [Rights.read], span) + if (!NoderedUtil.IsNullEmpty(queryas) && !NoderedUtil.IsNullUndefinded(impersonationquery)) { + _query = { $and: [query, this.getbasequery(user, "_acl", [Rights.read]), impersonationquery] }; + } else { + _query = { $and: [query, this.getbasequery(user, "_acl", [Rights.read])] }; + } + } + span?.setAttribute("collection", collectionname); + span?.setAttribute("username", user.username); + const ot_end = Logger.otel.startTimer(); + const mongodbspan: Span = Logger.otel.startSubSpan("mongodb.find", span); + // @ts-ignore + let result = await this.db.collection(collectionname).countDocuments(_query); + mongodbspan?.setAttribute("results", result); + Logger.otel.endSpan(mongodbspan); + Logger.otel.endTimer(ot_end, DatabaseConnection.mongodb_count, DatabaseConnection.otel_label(collectionname, user, "count")); + Logger.instanse.debug("DatabaseConnection", "count", "[" + user.username + "][" + collectionname + "] count gave " + result + " results "); + return result; + } catch (error) { + Logger.instanse.error("DatabaseConnection", "count", "[" + collectionname + "] count error " + (error.message ? error.message : error)); + span?.recordException(error); + throw error; + } finally { + Logger.otel.endSpan(span); + } + } async GetLatestDocumentVersion(options: GetLatestDocumentVersionOptions, parent: Span): Promise { let { collectionname, id, jwt, decrypt } = Object.assign({ decrypt: true @@ -3364,12 +3461,22 @@ export class DatabaseConnection extends events.EventEmitter { var field = Config.text_index_name_fields[i]; if (Array.isArray(item[field])) { for (var y = 0; y < item[field].length; y++) { - if (!NoderedUtil.IsNullEmpty(item[field][y])) { - var name: string = item[field][y].toLowerCase(); - name = name.replace(/[.*!#"'`|%$@+\-?^${}()|[\]\\]/g, " ").trim(); - _searchnames = _searchnames.concat(name.split(" ")); - _searchnames.push(name); - if (name != item[field][y].toLowerCase()) _searchnames.push(item[field][y].toLowerCase()); + try { + if (!NoderedUtil.IsNullEmpty(item[field][y])) { + var name: string = item[field][y].toLowerCase(); + name = name.replace(/[.*!#"'`|%$@+\-?^${}()|[\]\\]/g, " ").trim(); + _searchnames = _searchnames.concat(name.split(" ")); + _searchnames.push(name); + if (name != item[field][y].toLowerCase()) _searchnames.push(item[field][y].toLowerCase()); + } + } catch (error) { + Logger.instanse.error("DatabaseConnection", "ensureResource", error); + if (item[field] && item[field][y]) { + console.log(field + "/" + y, item[field][y]); + } else { + console.log(field, item[field]); + } + } } } else { @@ -4087,11 +4194,13 @@ export class DatabaseConnection extends events.EventEmitter { } return false; } - static otel_label(collectionname: string, user: TokenUser | User, action: "query" | "aggregate" | "insert" | "insertmany" | "update" | "updatemany" | "replace" | "delete" | "deletemany") { + static otel_label(collectionname: string, user: TokenUser | User, action: "query" | "count" | "aggregate" | "insert" | "insertmany" | "update" | "updatemany" | "replace" | "delete" | "deletemany") { if (Config.otel_trace_mongodb_per_users) { return { collection: collectionname, username: user.username }; } else if (Config.otel_trace_mongodb_query_per_users && action == "query") { return { collection: collectionname, username: user.username }; + } else if (Config.otel_trace_mongodb_count_per_users && action == "count") { + return { collection: collectionname, username: user.username }; } else if (Config.otel_trace_mongodb_aggregate_per_users && action == "aggregate") { return { collection: collectionname, username: user.username }; } else if (Config.otel_trace_mongodb_insert_per_users && (action == "insert" || action == "insertmany")) { diff --git a/OpenFlow/src/Messages/Message.ts b/OpenFlow/src/Messages/Message.ts index 94388aa3..b1bb47d4 100644 --- a/OpenFlow/src/Messages/Message.ts +++ b/OpenFlow/src/Messages/Message.ts @@ -10,7 +10,7 @@ import { Readable, Stream } from "stream"; import { GridFSBucket, ObjectId, Binary, FindCursor, GridFSFile, Filter } from "mongodb"; import * as path from "path"; import { DatabaseConnection } from "../DatabaseConnection"; -import { StripeMessage, NoderedUtil, QueuedMessage, RegisterQueueMessage, QueueMessage, CloseQueueMessage, ListCollectionsMessage, DropCollectionMessage, QueryMessage, AggregateMessage, InsertOneMessage, UpdateOneMessage, Base, UpdateManyMessage, InsertOrUpdateOneMessage, DeleteOneMessage, MapReduceMessage, SigninMessage, TokenUser, User, Rights, EnsureNoderedInstanceMessage, DeleteNoderedInstanceMessage, DeleteNoderedPodMessage, RestartNoderedInstanceMessage, GetNoderedInstanceMessage, GetNoderedInstanceLogMessage, SaveFileMessage, WellknownIds, GetFileMessage, UpdateFileMessage, NoderedUser, WatchMessage, GetDocumentVersionMessage, DeleteManyMessage, InsertManyMessage, RegisterExchangeMessage, EnsureCustomerMessage, Customer, stripe_tax_id, Role, SelectCustomerMessage, Rolemember, ResourceUsage, Resource, ResourceVariant, stripe_subscription, GetNextInvoiceMessage, stripe_invoice, stripe_price, stripe_plan, stripe_invoice_line, GetKubeNodeLabelsMessage, CreateWorkflowInstanceMessage, WorkitemFile, InsertOrUpdateManyMessage, Ace, stripe_base } from "@openiap/openflow-api"; +import { StripeMessage, NoderedUtil, QueuedMessage, RegisterQueueMessage, QueueMessage, CloseQueueMessage, ListCollectionsMessage, DropCollectionMessage, QueryMessage, AggregateMessage, InsertOneMessage, UpdateOneMessage, Base, UpdateManyMessage, InsertOrUpdateOneMessage, DeleteOneMessage, MapReduceMessage, SigninMessage, TokenUser, User, Rights, EnsureNoderedInstanceMessage, DeleteNoderedInstanceMessage, DeleteNoderedPodMessage, RestartNoderedInstanceMessage, GetNoderedInstanceMessage, GetNoderedInstanceLogMessage, SaveFileMessage, WellknownIds, GetFileMessage, UpdateFileMessage, NoderedUser, WatchMessage, GetDocumentVersionMessage, DeleteManyMessage, InsertManyMessage, RegisterExchangeMessage, EnsureCustomerMessage, Customer, stripe_tax_id, Role, SelectCustomerMessage, Rolemember, ResourceUsage, Resource, ResourceVariant, stripe_subscription, GetNextInvoiceMessage, stripe_invoice, stripe_price, stripe_plan, stripe_invoice_line, GetKubeNodeLabelsMessage, CreateWorkflowInstanceMessage, WorkitemFile, InsertOrUpdateManyMessage, Ace, stripe_base, CountMessage } from "@openiap/openflow-api"; import { stripe_customer, stripe_list, StripeAddPlanMessage, StripeCancelPlanMessage, stripe_subscription_item, stripe_coupon } from "@openiap/openflow-api"; import { amqpwrapper, QueueMessageOptions } from "../amqpwrapper"; import { WebSocketServerClient } from "../WebSocketServerClient"; @@ -79,6 +79,9 @@ export class Message { case "query": await this.Query(span); break; + case "count": + await this.Count(span); + break; case "getdocumentversion": await this.GetDocumentVersion(span); break; @@ -310,6 +313,14 @@ export class Message { cli.Send(this); } break; + case "count": + if (Config.enable_openflow_amqp) { + cli.Send(await QueueClient.SendForProcessing(this, this.priority)); + } else { + await this.Count(span); + cli.Send(this); + } + break; case "getdocumentversion": if (Config.enable_openflow_amqp) { cli.Send(await QueueClient.SendForProcessing(this, this.priority)); @@ -1118,6 +1129,36 @@ export class Message { } Logger.otel.endSpan(span); } + private async Count(parent: Span): Promise { + const span: Span = Logger.otel.startSubSpan("message.Count", parent); + this.Reply(); + let msg: CountMessage + try { + msg = CountMessage.assign(this.data); + if (NoderedUtil.IsNullEmpty(msg.jwt)) { msg.jwt = this.jwt; } + if (NoderedUtil.IsNullEmpty(msg.jwt)) { + span?.recordException("Access denied, not signed in") + msg.error = "Access denied, not signed in"; + } else { + const { query, collectionname, jwt, queryas } = msg; + msg.result = await Config.db.count({ query, collectionname, jwt, queryas }, span); + } + delete msg.query; + } catch (error) { + await handleError(null, error); + span?.recordException(error) + if (NoderedUtil.IsNullUndefinded(msg)) { (msg as any) = {}; } + if (msg !== null && msg !== undefined) msg.error = error.message ? error.message : error; + } + try { + this.data = JSON.stringify(msg); + } catch (error) { + this.data = ""; + span?.recordException(error) + await handleError(null, error); + } + Logger.otel.endSpan(span); + } private async GetDocumentVersion(parent: Span): Promise { const span: Span = Logger.otel.startSubSpan("message.GetDocumentVersion", parent); this.Reply(); From bf93c8bf08105c9146dea08b5d63b9152fe4d7f5 Mon Sep 17 00:00:00 2001 From: Allan Zimmermann Date: Sat, 8 Oct 2022 18:30:36 +0200 Subject: [PATCH 3/4] move file-system-cache out of api --- OpenFlow/src/Config.ts | 4 + OpenFlow/src/Logger.ts | 34 +- OpenFlowNodeRED/package.json | 3 +- OpenFlowNodeRED/src/Config.ts | 2 + .../src/file-system-cache/funcs.ts | 126 +++++++ .../src/file-system-cache/index.ts | 317 ++++++++++++++++++ OpenFlowNodeRED/src/index.ts | 14 +- .../src/node-red-contrib-auth-saml.ts | 2 +- .../src/node-red-contrib-openflow-storage.ts | 30 +- package.json | 2 +- test/DatabaseConnection.test.ts | 10 + test/workitemqueue.test.ts | 26 +- 12 files changed, 560 insertions(+), 10 deletions(-) create mode 100644 OpenFlowNodeRED/src/file-system-cache/funcs.ts create mode 100644 OpenFlowNodeRED/src/file-system-cache/index.ts diff --git a/OpenFlow/src/Config.ts b/OpenFlow/src/Config.ts index 39f79396..3b51964c 100644 --- a/OpenFlow/src/Config.ts +++ b/OpenFlow/src/Config.ts @@ -60,6 +60,7 @@ export class dbConfig extends Base { public otel_trace_connection_ips: boolean; public otel_trace_mongodb_per_users: boolean; public otel_trace_mongodb_query_per_users: boolean; + public otel_trace_mongodb_count_per_users: boolean; public otel_trace_mongodb_aggregate_per_users: boolean; public otel_trace_mongodb_insert_per_users: boolean; public otel_trace_mongodb_update_per_users: boolean; @@ -139,6 +140,7 @@ export class dbConfig extends Base { Config.otel_trace_connection_ips = Config.parseBoolean(conf.otel_trace_connection_ips ? conf.otel_trace_connection_ips : Config.getEnv("otel_trace_connection_ips", "false")); Config.otel_trace_mongodb_per_users = Config.parseBoolean(conf.otel_trace_mongodb_per_users ? conf.otel_trace_mongodb_per_users : Config.getEnv("otel_trace_mongodb_per_users", "false")); Config.otel_trace_mongodb_query_per_users = Config.parseBoolean(conf.otel_trace_mongodb_query_per_users ? conf.otel_trace_mongodb_query_per_users : Config.getEnv("otel_trace_mongodb_query_per_users", "false")); + Config.otel_trace_mongodb_count_per_users = Config.parseBoolean(conf.otel_trace_mongodb_count_per_users ? conf.otel_trace_mongodb_count_per_users : Config.getEnv("otel_trace_mongodb_query_per_users", "false")); Config.otel_trace_mongodb_aggregate_per_users = Config.parseBoolean(conf.otel_trace_mongodb_aggregate_per_users ? conf.otel_trace_mongodb_aggregate_per_users : Config.getEnv("otel_trace_mongodb_aggregate_per_users", "false")); Config.otel_trace_mongodb_insert_per_users = Config.parseBoolean(conf.otel_trace_mongodb_insert_per_users ? conf.otel_trace_mongodb_insert_per_users : Config.getEnv("otel_trace_mongodb_insert_per_users", "false")); Config.otel_trace_mongodb_update_per_users = Config.parseBoolean(conf.otel_trace_mongodb_update_per_users ? conf.otel_trace_mongodb_update_per_users : Config.getEnv("otel_trace_mongodb_update_per_users", "false")); @@ -366,6 +368,7 @@ export class Config { Config.otel_trace_connection_ips = Config.parseBoolean(Config.getEnv("otel_trace_connection_ips", "false")); Config.otel_trace_mongodb_per_users = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_per_users", "false")); Config.otel_trace_mongodb_query_per_users = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_query_per_users", "false")); + Config.otel_trace_mongodb_count_per_users = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_count_per_users", "false")); Config.otel_trace_mongodb_aggregate_per_users = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_aggregate_per_users", "false")); Config.otel_trace_mongodb_insert_per_users = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_insert_per_users", "false")); Config.otel_trace_mongodb_update_per_users = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_update_per_users", "false")); @@ -576,6 +579,7 @@ export class Config { public static otel_trace_connection_ips: boolean = Config.parseBoolean(Config.getEnv("otel_trace_connection_ips", "false")); public static otel_trace_mongodb_per_users: boolean = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_per_users", "false")); public static otel_trace_mongodb_query_per_users: boolean = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_query_per_users", "false")); + public static otel_trace_mongodb_count_per_users: boolean = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_count_per_users", "false")); public static otel_trace_mongodb_aggregate_per_users: boolean = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_aggregate_per_users", "false")); public static otel_trace_mongodb_insert_per_users: boolean = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_insert_per_users", "false")); public static otel_trace_mongodb_update_per_users: boolean = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_update_per_users", "false")); diff --git a/OpenFlow/src/Logger.ts b/OpenFlow/src/Logger.ts index 4aefa466..f349f1ad 100644 --- a/OpenFlow/src/Logger.ts +++ b/OpenFlow/src/Logger.ts @@ -3,6 +3,7 @@ import { i_license_file, i_nodered_driver, i_otel } from "./commoninterfaces"; import { Config } from "./Config"; import { dockerdriver } from "./dockerdriver"; import { DBHelper } from './DBHelper'; +const fs = require('fs'); const path = require('path'); const MAX_RETRIES_DEFAULT = 5 @@ -160,6 +161,37 @@ export class Logger { if (Config.otel_warn_log) Logger.enabled["WebSocketServerClient"] = level.Warning; if (Config.otel_err_log) Logger.enabled["WebSocketServerClient"] = level.Error; } + static hasDockerEnv(): boolean { + try { + fs.statSync('/.dockerenv'); + return true; + } catch (_) { + return false; + } + } + static hasDockerCGroup() { + try { + if (fs.readFileSync('/proc/self/cgroup', 'utf8').includes('docker')) return true; + return fs.readFileSync('/proc/self/cgroup', 'utf8').includes('/kubepods'); + } catch (_) { + return false; + } + } + private static _isDocker: boolean = null; + public static isDocker(): boolean { + if (Logger._isDocker != null) return Logger._isDocker; + Logger._isDocker = Logger.hasDockerEnv() || Logger.hasDockerCGroup(); + return false; + } + private static _isKubernetes: boolean = null; + public static isKubernetes(): boolean { + if (Logger._isKubernetes != null) return Logger._isKubernetes; + if (!Logger.isDocker()) { Logger._isKubernetes = false; return false; } + if (NoderedUtil.IsNullEmpty(process.env["KUBERNETES_SERVICE_HOST"])) { Logger._isKubernetes = false; return false; } + Logger._isKubernetes = true; + return true; + } + static configure(skipotel: boolean, skiplic: boolean): void { Logger.DBHelper = new DBHelper(); Logger.reload() @@ -197,7 +229,7 @@ export class Logger { } this.nodereddriver = null; - if (!NoderedUtil.isKubernetes() && NoderedUtil.isDocker()) { + if (!Logger.isKubernetes() && Logger.isDocker()) { if (NoderedUtil.IsNullEmpty(process.env["KUBERNETES_SERVICE_HOST"])) { try { this.nodereddriver = new dockerdriver(); diff --git a/OpenFlowNodeRED/package.json b/OpenFlowNodeRED/package.json index 79642170..8afcf028 100644 --- a/OpenFlowNodeRED/package.json +++ b/OpenFlowNodeRED/package.json @@ -25,7 +25,7 @@ }, "dependencies": { "@nodemailer/mailparser2": "^1.0.3", - "@openiap/openflow-api": "^2.0.20", + "@openiap/openflow-api": "^2.1.1", "@opentelemetry/api-metrics": "^0.32.0", "@opentelemetry/exporter-metrics-otlp-grpc": "^0.32.0", "@opentelemetry/exporter-trace-otlp-grpc": "^0.32.0", @@ -46,6 +46,7 @@ "passport-saml": "^3.2.1", "passport-saml-metadata": "^2.6.0", "pm2": "^5.2.0", + "ramda": "^0.28.0", "readline-sync": "^1.4.10", "request": "^2.88.2", "smtp-server": "^3.11.0" diff --git a/OpenFlowNodeRED/src/Config.ts b/OpenFlowNodeRED/src/Config.ts index 1f51a8b4..e5e0d06e 100644 --- a/OpenFlowNodeRED/src/Config.ts +++ b/OpenFlowNodeRED/src/Config.ts @@ -17,6 +17,7 @@ export class Config { public static reload(): void { Config.getversion(); Config.logpath = Config.getEnv("logpath", __dirname); + Config.enable_file_cache = Config.parseBoolean(Config.getEnv("log_with_colors", "true")); Config.log_with_colors = Config.parseBoolean(Config.getEnv("log_with_colors", "true")); Config.log_with_trace = Config.parseBoolean(Config.getEnv("log_with_trace", "false")); @@ -99,6 +100,7 @@ export class Config { public static version: string = Config.getversion(); public static unittesting: boolean = false; public static logpath: string = Config.getEnv("logpath", __dirname); + public static enable_file_cache: boolean = Config.parseBoolean(Config.getEnv("log_with_colors", "true")); public static log_with_trace: boolean = Config.parseBoolean(Config.getEnv("log_with_trace", "false")); public static log_with_colors: boolean = Config.parseBoolean(Config.getEnv("log_with_colors", "true")); public static log_information: boolean = Config.parseBoolean(Config.getEnv("log_information", "true")); diff --git a/OpenFlowNodeRED/src/file-system-cache/funcs.ts b/OpenFlowNodeRED/src/file-system-cache/funcs.ts new file mode 100644 index 00000000..7badc05e --- /dev/null +++ b/OpenFlowNodeRED/src/file-system-cache/funcs.ts @@ -0,0 +1,126 @@ +/* +The MIT License (MIT) + +Copyright (c) 2015 Phil Cockfield (https://github.com/philcockfield) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +let R: any = null; +try { + R = require("ramda"); +} catch (error) { +} +let fs: any = null; +try { + fs = require("fs"); +} catch (error) { +} +let fsPath: any = null; +try { + fsPath = require("path"); +} catch (error) { +} +let crypto: any = null; +try { + crypto = require("crypto"); +} catch (error) { +} + +export const isNothing = (value) => R.isNil(value) || R.isEmpty(value); +export const isString = R.is(String); +export const compact = R.pipe(R.flatten, R.reject(R.isNil)); +export const toStringArray = R.pipe(compact, R.map(R.toString)); +export const toAbsolutePath = (path) => path.startsWith('.') ? fsPath.resolve(path) : path; +export const ensureString = R.curry( + (defaultValue, text) => R.is(String, text) ? text : defaultValue +); + + +export const isFileSync = (path) => { + if (fs.existsSync(path)) { + return fs.lstatSync(path).isFile(); + } + return false; +}; + + +export const readFileSync = (path) => { + if (fs.existsSync(path)) { + return fs.readFileSync(path).toString(); + } +}; + + +export const existsP = (path) => new Promise((resolve) => { + fs.exists(path, (exists) => resolve(exists)); +}); + + +export const removeFileP = (path) => new Promise((resolve, reject) => { + existsP(path) + .then((exists) => { + if (exists) { + fs.unlink(path, (err) => { + if (err) { reject(err); } else { resolve(); } + }); + } else { + resolve(); + } + }); +}); + + +export const filePathsP = (basePath, ns) => new Promise((resolve, reject) => { + existsP(basePath) + .then(exists => { + if (!exists) { resolve([]); return; } + fs.readdir(basePath, (err, fileNames) => { + if (err) { + reject(err); + } else { + const paths = R.pipe( + compact, + R.filter((name: string) => ns ? name.startsWith(ns) : true), + R.filter((name: string) => !ns ? !R.includes('-')(name) : true), + R.map(name => `${basePath}/${name}`) + )(fileNames); + resolve(paths); + } + }); + }); +}); + + +/** + * Turns a set of values into a HEX hash code. + * @param values: The set of values to hash. + * @return {String} or undefined. + */ +export const hash = (...values) => { + if (R.pipe(compact, R.isEmpty)(values)) { return undefined; } + const resultHash = crypto.createHash('md5'); + const addValue = value => resultHash.update(value); + const addValues = R.forEach(addValue); + R.pipe( + toStringArray, + addValues + )(values); + return resultHash.digest('hex'); +}; \ No newline at end of file diff --git a/OpenFlowNodeRED/src/file-system-cache/index.ts b/OpenFlowNodeRED/src/file-system-cache/index.ts new file mode 100644 index 00000000..027a934e --- /dev/null +++ b/OpenFlowNodeRED/src/file-system-cache/index.ts @@ -0,0 +1,317 @@ +/* +The MIT License (MIT) + +Copyright (c) 2015 Phil Cockfield (https://github.com/philcockfield) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ +import * as f from './funcs'; + +let R: any = null; +try { + R = require("ramda"); +} catch (error) { +} +let p: any = null; +try { + p = require("path"); +} catch (error) { +} +let fs: any = null; +try { + fs = require("fs"); +} catch (error) { +} + + +const formatPath = R.pipe(f.ensureString('./.cache'), f.toAbsolutePath); + +const toGetValue = (data) => { + const { type } = data; + let { value } = data; + if (type === 'Date') { value = new Date(value); } + return value; +}; + + +const toJson = (value) => JSON.stringify({ value, type: R.type(value) }); + +/** + * A cache that read/writes to a specific part of the file-system. + */ +export class FileSystemCache { + private basePathExists: boolean = false; + /** + * Constructor. + * @param options + * - basePath: The folder path to read/write to. + * Default: './build' + * - ns: A single value, or array, that represents a + * a unique namespace within which values for this + * store are cached. + * - extension: An optional file-extension for paths. + */ + constructor(public basePath: string, public ns?: string | any[], public extension?: string) { + this.basePath = formatPath(basePath); + this.ns = f.hash(ns); + if (f.isString(extension)) { this.extension = extension; } + if (f.isFileSync(this.basePath)) { + throw new Error(`The basePath '${this.basePath}' is a file. It should be a folder.`); + } + } + + /** + * Generates the path to the cached files. + * @param {string} key: The key of the cache item. + * @return {string}. + */ + path(key) { + if (f.isNothing(key)) { throw new Error(`Path requires a cache key.`); } + let name = f.hash(key); + if (this.ns) { name = `${this.ns}-${name}`; } + if (this.extension) { + name = `${name}.${this.extension.replace(/^\./, '')}`; + } + return p.join(this.basePath, name); + } + + + /** + * Determines whether the file exists. + * @param {string} key: The key of the cache item. + * @return {Promise} + */ + fileExists(key) { return f.existsP(this.path(key)); } + + + /** + * Ensure that the base path exists. + * @return {Promise} + */ + ensureBasePath() { + return new Promise((resolve, reject) => { + if (this.basePathExists) { + resolve(); + } else { + if (!fs.existsSync(this.basePath)) { + fs.mkdirSync(this.basePath); + } + this.basePathExists = true; + resolve(); + // fs.ensureDir(this.basePath).then(() => { + // this.basePathExists = true; + // resolve(); + // }).catch((err) => { + // reject(err); + // }) + } + }); + } + + + /** + * Gets the contents of the file with the given key. + * @param {string} key: The key of the cache item. + * @param defaultValue: Optional. A default value to return if the value does not exist in cache. + * @return {Promise} - File contents, or + * undefined if the file does not exist. + */ + get(key, defaultValue): Promise { return this.getValueP(this.path(key), defaultValue); } + + + /** + * Gets the contents of the file with the given key. + * @param {string} key: The key of the cache item. + * @param defaultValue: Optional. A default value to return if the value does not exist in cache. + * @return the cached value, or undefined. + */ + getSync(key, defaultValue) { + const path = this.path(key); + return fs.existsSync(path) + ? toGetValue(this.readJsonSync(path)) + : defaultValue; + } + readJsonSync(path) { + try { + // if (fs.existsSync(path)) { + const json = fs.readFileSync(path, "utf8"); + return JSON.parse(json); + // } + } catch (error) { + return {}; + } + } + + /** + * Writes the given value to the file-system. + * @param {string} key: The key of the cache item. + * @param value: The value to write (Primitive or Object). + * @return {Promise} + */ + set(key, value) { + const path = this.path(key); + return new Promise((resolve, reject) => { + this.ensureBasePath() + .then(() => { + fs.writeFile(path, toJson(value), (err) => { + if (err) { reject(err); } else { resolve({ path }); } + }); + }) + .catch(err => reject(err)); + }); + } + + + /** + * Writes the given value to the file-system and memory cache. + * @param {string} key: The key of the cache item. + * @param value: The value to write (Primitive or Object). + * @return the cache. + */ + setSync(key, value) { + fs.writeFileSync(this.path(key), toJson(value)); + return this; + } + + + /** + * Removes the item from the file-system. + * @param {string} key: The key of the cache item. + * @return {Promise} + */ + remove(key) { return f.removeFileP(this.path(key)); } + + + /** + * Removes all items from the cache. + * @return {Promise} + */ + clear() { + return new Promise((resolve, reject) => { + f.filePathsP(this.basePath, this.ns) + .then(paths => { + const remove = (index) => { + const path = paths[index]; + if (path) { + f.removeFileP(path) + .then(() => remove(index + 1)) // <== RECURSION. + .catch(err => reject(err)); + } else { + resolve(); // All files have been removed. + } + }; + remove(0); + }) + .catch(err => reject(err)); + }); + } + + + /** + * Saves several items to the cache in one operation. + * @param {array} items: An array of objects of the form { key, value }. + * @return {Promise} + */ + save(items) { + // Setup initial conditions. + if (!R.is(Array, items)) { items = [items]; } + const isValid = (item) => { + if (!R.is(Object, item)) { return false; } + return item.key && item.value; + }; + items = R.pipe( + R.reject(R.isNil), + R.forEach((item) => { + if (!isValid(item)) { + throw new Error(`Save items not valid, must be an array of {key, value} objects.`); + } + }) + )(items); + + return new Promise((resolve, reject) => { + // Don't continue if no items were passed. + const response = { paths: [] }; + if (items.length === 0) { + resolve(response); + return; + } + + // Recursively set each item to the file-system. + const setValue = (index) => { + const item = items[index]; + if (item) { + this.set(item.key, item.value) + .then((result: any) => { + response.paths[index] = result.path; + setValue(index + 1); // <== RECURSION. + }) + .catch(err => reject(err)); + } else { + // No more items - done. + resolve(response); + } + }; + setValue(0); + }); + } + + + /** + * Loads all files within the cache's namespace. + */ + load() { + return new Promise((resolve, reject) => { + f.filePathsP(this.basePath, this.ns) + .then((paths: any) => { + // Bail out if there are no paths in the folder. + const response = { files: [] }; + if (paths.length === 0) { + resolve(response); + return; + } + + // Get each value. + const getValue = (index) => { + const path = paths[index]; + if (path) { + this.getValueP(path) + .then(result => { + response.files[index] = { path, value: result }; + getValue(index + 1); // <== RECURSION. + }) + .catch(err => reject(err)); + } else { + // All paths have been loaded. + resolve(response); + } + }; + getValue(0); + }) + .catch(err => reject(err)); + }); + } + private getValueP(path, defaultValue?): Promise { + return new Promise((resolve, reject) => { + const result = this.readJsonSync(path); + const value = toGetValue(result); + resolve(value); + }); + } + +} \ No newline at end of file diff --git a/OpenFlowNodeRED/src/index.ts b/OpenFlowNodeRED/src/index.ts index 1e37243f..31fc6f0a 100644 --- a/OpenFlowNodeRED/src/index.ts +++ b/OpenFlowNodeRED/src/index.ts @@ -6,7 +6,7 @@ import { Logger } from "./Logger"; import { WebServer } from "./WebServer"; import { Config } from "./Config"; import { Crypt } from "./nodeclient/Crypt"; -import { FileSystemCache } from "@openiap/openflow-api"; +import { FileSystemCache } from "./file-system-cache"; Logger.configure(false); Logger.instanse.info("index", "", "starting openflow nodered"); @@ -109,7 +109,17 @@ let server: http.Server = null; } } } - socket.setCacheFolder(Config.logpath); + if (Config.enable_file_cache) { + // socket.setCacheFolder(Config.logpath); + try { + const fileCache = require('./file-system-cache'); + const path = require('path'); + socket.messageStore = new fileCache.FileSystemCache(path.join(Config.logpath, '.openflowapicache')); + } catch (error) { + Logger.instanse.error("index", "enable_file_cache", error); + } + + } socket.agent = "nodered"; socket.version = Config.version; Logger.instanse.info("index", "", "VERSION: " + Config.version); diff --git a/OpenFlowNodeRED/src/node-red-contrib-auth-saml.ts b/OpenFlowNodeRED/src/node-red-contrib-auth-saml.ts index b3c39a07..809edd35 100644 --- a/OpenFlowNodeRED/src/node-red-contrib-auth-saml.ts +++ b/OpenFlowNodeRED/src/node-red-contrib-auth-saml.ts @@ -5,7 +5,7 @@ import { fetch, toPassportConfig } from "passport-saml-metadata"; import * as https from "https"; import { Logger, promiseRetry } from "./Logger"; import { Config } from "./Config"; -import { FileSystemCache } from "@openiap/openflow-api"; +import { FileSystemCache } from "./file-system-cache"; // tslint:disable-next-line: class-name export class samlauthstrategyoptions { diff --git a/OpenFlowNodeRED/src/node-red-contrib-openflow-storage.ts b/OpenFlowNodeRED/src/node-red-contrib-openflow-storage.ts index 350c13cf..7f667b01 100644 --- a/OpenFlowNodeRED/src/node-red-contrib-openflow-storage.ts +++ b/OpenFlowNodeRED/src/node-red-contrib-openflow-storage.ts @@ -5,7 +5,7 @@ import { nodered_settings } from "./nodered_settings"; import { Config } from "./Config"; import { WebSocketClient, NoderedUtil, Base } from "@openiap/openflow-api"; import * as nodered from "node-red"; -import { FileSystemCache } from "@openiap/openflow-api"; +import { FileSystemCache } from "./file-system-cache"; import { servicename } from "./nodeclient/cliutil"; import { pm2restart } from "./nodeclient/pm2util"; import { Logger } from "./Logger"; @@ -964,7 +964,7 @@ export class noderedcontribopenflowstorage { Logger.instanse.info("storage", "onupdate", "check for exit exitprocess: " + exitprocess + " update: " + update + " " + new Date().toLocaleTimeString()); if (exitprocess && Config.auto_restart_when_needed) { - if (NoderedUtil.isDocker()) { + if (noderedcontribopenflowstorage.isDocker()) { Logger.instanse.info("storage", "onupdate", "Running as docker, just quit process, kubernetes will start a new version"); this.RED.log.warn("noderedcontribopenflowstorage::onupdate: Running as docker, just quit process, kubernetes will start a new version"); process.exit(1); @@ -1042,7 +1042,7 @@ export class noderedcontribopenflowstorage { } } if (exitprocess && Config.auto_restart_when_needed) { - if (NoderedUtil.isDocker()) { + if (noderedcontribopenflowstorage.isDocker()) { Logger.instanse.info("storage", "saveSettings", "Running as docker, just quit process, kubernetes will start a new version"); process.exit(1); } else { @@ -1140,5 +1140,29 @@ export class noderedcontribopenflowstorage { } return null; } + static hasDockerEnv(): boolean { + try { + const fs = require('fs'); + fs.statSync('/.dockerenv'); + return true; + } catch (_) { + return false; + } + } + static hasDockerCGroup() { + try { + const fs = require('fs'); + if (fs.readFileSync('/proc/self/cgroup', 'utf8').includes('docker')) return true; + return fs.readFileSync('/proc/self/cgroup', 'utf8').includes('/kubepods'); + } catch (_) { + return false; + } + } + private static _isDocker: boolean = null; + public static isDocker(): boolean { + if (noderedcontribopenflowstorage._isDocker != null) return noderedcontribopenflowstorage._isDocker; + noderedcontribopenflowstorage._isDocker = noderedcontribopenflowstorage.hasDockerEnv() || noderedcontribopenflowstorage.hasDockerCGroup(); + return false; + } } diff --git a/package.json b/package.json index 3aaa6910..c4bf27f9 100644 --- a/package.json +++ b/package.json @@ -33,7 +33,7 @@ "dependencies": { "@fortawesome/fontawesome-free": "^5.15.3", "@kubernetes/client-node": "0.17.0", - "@openiap/openflow-api": "^2.0.20", + "@openiap/openflow-api": "^2.1.1", "@opentelemetry/api-metrics": "^0.32.0", "@opentelemetry/exporter-metrics-otlp-grpc": "^0.32.0", "@opentelemetry/exporter-trace-otlp-grpc": "^0.32.0", diff --git a/test/DatabaseConnection.test.ts b/test/DatabaseConnection.test.ts index af1c8550..53d336ea 100644 --- a/test/DatabaseConnection.test.ts +++ b/test/DatabaseConnection.test.ts @@ -92,6 +92,16 @@ import { Crypt } from '../OpenFlow/src/Crypt'; assert.strictEqual(items.length, 5, "Root did not find any files"); } @timeout(5000) + @test async 'count'() { + var usercount = await Config.db.count({ collectionname: "users", query: { "_type": "user" }, jwt: this.rootToken }, null); + assert.notDeepStrictEqual(usercount, null); + assert.notStrictEqual(usercount, 0); + var rolecount = await Config.db.count({ collectionname: "users", query: { "_type": "role" }, jwt: this.rootToken }, null); + assert.notDeepStrictEqual(rolecount, null); + assert.notStrictEqual(rolecount, 0); + assert.notStrictEqual(usercount, rolecount); + } + @timeout(5000) @test async 'GetDocumentVersion'() { let item = new Base(); item.name = "item version 0"; item = await Config.db.InsertOne(item, "entities", 1, true, this.userToken, null); diff --git a/test/workitemqueue.test.ts b/test/workitemqueue.test.ts index 026a2423..56b7482f 100644 --- a/test/workitemqueue.test.ts +++ b/test/workitemqueue.test.ts @@ -1,13 +1,20 @@ +import fs = require('fs'); +// import path = require('path'); +import pako = require('pako'); + + var wtf = require('wtfnode'); const path = require("path"); const env = path.join(process.cwd(), 'config', '.env'); require("dotenv").config({ path: env }); // , debug: false -import { AddWorkitem, NoderedUtil, WebSocketClient, Workitem } from '@openiap/openflow-api'; +import { AddWorkitem, MessageWorkitemFile, NoderedUtil, WebSocketClient, Workitem } from '@openiap/openflow-api'; import { suite, test, timeout } from '@testdeck/mocha'; import assert = require('assert'); import { Config } from '../OpenFlow/src/Config'; import { Logger } from '../OpenFlow/src/Logger'; +// C:\code\openflow-api + @suite class workitemqueue { private socket: WebSocketClient = null; @timeout(10000) @@ -71,6 +78,23 @@ import { Logger } from '../OpenFlow/src/Logger'; // await NoderedUtil.UpdateWorkitem({ _id: item._id, state: "successful" }); } + public static async CreateWorkitemFilesArray(files: string[], compressed: boolean): Promise { + var result: MessageWorkitemFile[] = []; + for (var i = 0; i < files.length; i++) { + let file: MessageWorkitemFile = new MessageWorkitemFile(); + file.filename = path.basename(files[i]); + if (fs.existsSync(files[i])) { + if (compressed) { + file.compressed = true; + file.file = Buffer.from(pako.deflate(fs.readFileSync(files[i], null))).toString('base64'); + } else { + file.file = fs.readFileSync(files[i], { encoding: 'base64' }); + } + result.push(file); + } else { throw new Error("File not found " + files[i]) } + } + return result; + } @timeout(10000) @test async 'basic workitem test with files'() { let q = await NoderedUtil.GetWorkitemQueue({ name: "test queue" }); From 532da3f8b0f9566b4c5b5fabae76dfd6637f284c Mon Sep 17 00:00:00 2001 From: Allan Zimmermann Date: Sat, 8 Oct 2022 20:26:20 +0200 Subject: [PATCH 4/4] add recursive for deletemany when using ids --- OpenFlow/src/DatabaseConnection.ts | 11 ++++++++--- OpenFlow/src/Messages/Message.ts | 12 ++++++------ OpenFlow/src/OAuthProvider.ts | 2 +- OpenFlowNodeRED/package.json | 2 +- package.json | 2 +- 5 files changed, 17 insertions(+), 12 deletions(-) diff --git a/OpenFlow/src/DatabaseConnection.ts b/OpenFlow/src/DatabaseConnection.ts index 78997679..95356fb3 100644 --- a/OpenFlow/src/DatabaseConnection.ts +++ b/OpenFlow/src/DatabaseConnection.ts @@ -2964,7 +2964,7 @@ export class DatabaseConnection extends events.EventEmitter { for (let i = 0; i < collections.length; i++) { let collection = collections[i]; // var res = await this.DeleteMany(query, null, collection.name, null, jwt, span); - var res = await this.DeleteMany({}, null, collection.name, doc._id, jwt, span); + var res = await this.DeleteMany({}, null, collection.name, doc._id, false, jwt, span); Logger.instanse.info("DatabaseConnection", "DeleteOne", "[" + user.username + "][" + collection.name + "] Deleted " + res + " items from " + collection.name + " cleaning up after company " + doc.name); } // } @@ -3049,7 +3049,7 @@ export class DatabaseConnection extends events.EventEmitter { continue; } let startTime = new Date(); - var res = await this.DeleteMany({ "$or": [{ "_createdbyid": doc._id }, { "_modifiedbyid": doc._id }] }, null, collection.name, doc._id, jwt, span); + var res = await this.DeleteMany({ "$or": [{ "_createdbyid": doc._id }, { "_modifiedbyid": doc._id }] }, null, collection.name, doc._id, false, jwt, span); // @ts-ignore var timeDiff = ((new Date()) - startTime); //in ms Logger.instanse.info("DatabaseConnection", "DeleteOne", "[" + user.username + "][" + collection.name + "] Deleted " + res + " items from " + collection.name + " cleaning up after user " + doc.name + " (" + timeDiff + "ms)"); @@ -3108,7 +3108,7 @@ export class DatabaseConnection extends events.EventEmitter { * @param {string} jwt JWT of user who is doing the delete, ensuring rights * @returns Promise */ - async DeleteMany(query: string | any, ids: string[], collectionname: string, queryas: string, jwt: string, parent: Span): Promise { + async DeleteMany(query: string | any, ids: string[], collectionname: string, queryas: string, recursive: boolean, jwt: string, parent: Span): Promise { if (NoderedUtil.IsNullUndefinded(ids) && NoderedUtil.IsNullUndefinded(query)) { throw Error("id cannot be null"); } const span: Span = Logger.otel.startSubSpan("db.DeleteMany", parent); try { @@ -3201,6 +3201,11 @@ export class DatabaseConnection extends events.EventEmitter { } Logger.instanse.verbose("DatabaseConnection", "DeleteMany", "[" + user.username + "][" + collectionname + "] deleted " + deletecounter + " files in database"); return deletecounter; + } else if (recursive && !NoderedUtil.IsNullUndefinded(ids) && ids.length > 0) { + for (let i = 0; i < ids.length; i++) { + await this.DeleteOne(ids[i], collectionname, recursive, jwt, span); + } + return ids.length; } else { let bulkInsert = this.db.collection(collectionname + "_hist").initializeUnorderedBulkOp(); let bulkRemove = this.db.collection(collectionname).initializeUnorderedBulkOp() diff --git a/OpenFlow/src/Messages/Message.ts b/OpenFlow/src/Messages/Message.ts index b1bb47d4..114bcbba 100644 --- a/OpenFlow/src/Messages/Message.ts +++ b/OpenFlow/src/Messages/Message.ts @@ -1474,7 +1474,7 @@ export class Message { try { msg = DeleteManyMessage.assign(this.data); if (NoderedUtil.IsNullEmpty(msg.jwt)) { msg.jwt = this.jwt; } - msg.affectedrows = await Config.db.DeleteMany(msg.query, msg.ids, msg.collectionname, null, msg.jwt, span); + msg.affectedrows = await Config.db.DeleteMany(msg.query, msg.ids, msg.collectionname, null, msg.recursive, msg.jwt, span); delete msg.ids; } catch (error) { if (NoderedUtil.IsNullUndefinded(msg)) { (msg as any) = {}; } @@ -5202,7 +5202,7 @@ export class Message { msg.result = await Config.db._UpdateOne(null, wiq as any, "mq", 1, true, jwt, parent); if (msg.purge) { - await Config.db.DeleteMany({ "_type": "workitem", "wiqid": wiq._id }, null, "workitems", null, jwt, parent); + await Config.db.DeleteMany({ "_type": "workitem", "wiqid": wiq._id }, null, "workitems", null, false, jwt, parent); var items = await Config.db.query({ query: { "_type": "workitem", "wiqid": wiq._id }, collectionname: "workitems", top: 1, jwt }, parent); if (items.length > 0) { } @@ -5210,7 +5210,7 @@ export class Message { if (items.length > 0) { throw new Error("Failed purging workitemqueue " + wiq.name); } - await Config.db.DeleteMany({ "metadata.wiqid": wiq._id }, null, "fs.files", null, jwt, parent); + await Config.db.DeleteMany({ "metadata.wiqid": wiq._id }, null, "fs.files", null, false, jwt, parent); } } catch (error) { await handleError(null, error); @@ -5249,7 +5249,7 @@ export class Message { user = this.tuser; if (msg.purge) { - await Config.db.DeleteMany({ "_type": "workitem", "wiqid": wiq._id }, null, "workitems", null, jwt, parent); + await Config.db.DeleteMany({ "_type": "workitem", "wiqid": wiq._id }, null, "workitems", null, false, jwt, parent); var items = await Config.db.query({ query: { "_type": "workitem", "wiqid": wiq._id }, collectionname: "workitems", top: 1, jwt }, parent); if (items.length > 0) { items = await Config.db.query({ query: { "_type": "workitem", "wiqid": wiq._id }, collectionname: "workitems", top: 1, jwt }, parent); @@ -5257,7 +5257,7 @@ export class Message { if (items.length > 0) { throw new Error("Failed purging workitemqueue " + wiq.name); } - await Config.db.DeleteMany({ "metadata.wiqid": wiq._id }, null, "fs.files", null, jwt, parent); + await Config.db.DeleteMany({ "metadata.wiqid": wiq._id }, null, "fs.files", null, false, jwt, parent); } else { var items = await Config.db.query({ query: { "_type": "workitem", "wiqid": wiq._id }, collectionname: "workitems", top: 1, jwt }, parent); if (items.length > 0) { @@ -5296,7 +5296,7 @@ export class Message { switch (msg.command) { case "dumpwebsocketclients": if (!this.tuser.HasRoleId(WellknownIds.admins)) throw new Error("Access denied"); - await Config.db.DeleteMany({ "_type": "websocketclient" }, null, "websocketclients", null, jwt, parent); + await Config.db.DeleteMany({ "_type": "websocketclient" }, null, "websocketclients", null, false, jwt, parent); amqpwrapper.Instance().send("openflow", "", { "command": "dumpwebsocketclients" }, 10000, null, "", 1); break; case "killwebsocketclient": diff --git a/OpenFlow/src/OAuthProvider.ts b/OpenFlow/src/OAuthProvider.ts index eca84b2c..5be89a7c 100644 --- a/OpenFlow/src/OAuthProvider.ts +++ b/OpenFlow/src/OAuthProvider.ts @@ -436,7 +436,7 @@ export class Account { return token.item; } static async RemoveTokenRequest(code: string, parent: Span) { - let tokens = await Config.db.DeleteMany({ _type: "tokenrequest", "code": code }, null, "oauthtokens", null, Crypt.rootToken(), parent); + let tokens = await Config.db.DeleteMany({ _type: "tokenrequest", "code": code }, null, "oauthtokens", null, false, Crypt.rootToken(), parent); return tokens[0]; } } diff --git a/OpenFlowNodeRED/package.json b/OpenFlowNodeRED/package.json index 8afcf028..287dff2a 100644 --- a/OpenFlowNodeRED/package.json +++ b/OpenFlowNodeRED/package.json @@ -25,7 +25,7 @@ }, "dependencies": { "@nodemailer/mailparser2": "^1.0.3", - "@openiap/openflow-api": "^2.1.1", + "@openiap/openflow-api": "^2.1.2", "@opentelemetry/api-metrics": "^0.32.0", "@opentelemetry/exporter-metrics-otlp-grpc": "^0.32.0", "@opentelemetry/exporter-trace-otlp-grpc": "^0.32.0", diff --git a/package.json b/package.json index c4bf27f9..76e85e09 100644 --- a/package.json +++ b/package.json @@ -33,7 +33,7 @@ "dependencies": { "@fortawesome/fontawesome-free": "^5.15.3", "@kubernetes/client-node": "0.17.0", - "@openiap/openflow-api": "^2.1.1", + "@openiap/openflow-api": "^2.1.2", "@opentelemetry/api-metrics": "^0.32.0", "@opentelemetry/exporter-metrics-otlp-grpc": "^0.32.0", "@opentelemetry/exporter-trace-otlp-grpc": "^0.32.0",