Skip to content

Commit

Permalink
rewrite unarchivePatches so if blob is gone/broken, timetravel is aut…
Browse files Browse the repository at this point in the history
…omatically reset and admins get a message

- hopefully!  This is hard to test in dev mode.
  • Loading branch information
williamstein committed Jan 7, 2025
1 parent 22f3d20 commit 6528e5f
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 64 deletions.
58 changes: 3 additions & 55 deletions src/packages/database/postgres-blobs.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ required = defaults.required

{expire_time, one_result, all_results} = require('./postgres-base')
{delete_patches} = require('./postgres/delete-patches')
blobs = require('./postgres/blobs')

{filesystem_bucket} = require('./filesystem-bucket')

Expand Down Expand Up @@ -763,61 +764,8 @@ exports.extend_PostgreSQL = (ext) -> class PostgreSQL extends ext
delete_patches(db:@, string_id: opts.string_id, cb:cb)
], (err) => opts.cb?(err))

unarchive_patches: (opts) =>
opts = defaults opts,
string_id : required
cb : undefined
dbg = @_dbg("unarchive_patches(string_id='#{opts.string_id}')")
where = {"string_id = $::CHAR(40)" : opts.string_id}
@_query
query : "SELECT archived FROM syncstrings"
where : where
cb : one_result 'archived', (err, blob_uuid) =>
if err or not blob_uuid?
opts.cb?(err)
return
blob = undefined
async.series([
#(cb) =>
# For testing only!
# setTimeout(cb, 7000)
(cb) =>
dbg("download blob")
@get_blob
uuid : blob_uuid
cb : (err, x) =>
if err
cb(err)
else if not x?
cb("blob is gone")
else
blob = x
cb(err)
(cb) =>
dbg("extract blob")
try
patches = JSON.parse(blob)
catch e
cb("corrupt patches blob -- #{e}")
return
@import_patches
patches : patches
cb : cb
(cb) =>
async.parallel([
(cb) =>
dbg("update syncstring to indicate that patches are now available")
@_query
query : "UPDATE syncstrings SET archived=NULL"
where : where
cb : cb
(cb) =>
dbg('delete blob, which is no longer needed')
@delete_blob
uuid : blob_uuid
cb : cb
], cb)
], (err) => opts.cb?(err))
unarchivePatches: (string_id) =>
await blobs.unarchivePatches({db:@, string_id:string_id})

###
Export/import of syncstring history and info. Right now used mainly for debugging
Expand Down
86 changes: 86 additions & 0 deletions src/packages/database/postgres/blobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import getLogger from "@cocalc/backend/logger";
import type { PostgreSQL } from "./types";
import getPool from "@cocalc/database/pool";
import { callback2 } from "@cocalc/util/async-utils";

const logger = getLogger("database:blobs");

export async function unarchivePatches({
db,
string_id,
}: {
db: PostgreSQL;
string_id: string;
}) {
const dbg = (...args) => {
logger.debug("unarchivePatches", { string_id }, ...args);
};
dbg();

const pool = getPool();
const { rows } = await pool.query(
"SELECT archived, project_id, path FROM syncstrings WHERE string_id=$1",
[string_id],
);
if (rows.length == 0) {
throw Error(`no syncstring with id ${string_id}`);
}
const uuid = rows[0].archived;
if (!uuid) {
dbg("it is not archived");
return;
}
dbg("download blob");
let blob;
let error = "";
try {
blob = await callback2(db.get_blob, { uuid });
} catch (err) {
dbg(`WARNING -- unable to get blob with id ${uuid}`, err);
blob = null;
error = `${err}`;
}
if (blob == null) {
if (db.adminAlert != null) {
dbg("NONFATAL ERROR -- blob is GONE!");
// Instead of giving up, we basically give up on the syncstring history, and also
// send a message to admins to look into it. This is better than completely blocking
// access to the file to the user, especially since they have the file on disk along
// with filesystem snapshots. Also this *should* never happen. I'm writing this because
// I switched .compute-servers.syncdb between ephemeral and not, which seems to have
// broken some of these, and I think we also hit this once or twice before.
await db.adminAlert({
subject: `missing TimeTravel history for path='${rows[0].path}'`,
body: `The blob with TimeTravel history for editing path='${rows[0].path}' is missing.
Instead of breaking things for the user, things might work, but with the history reset. That said,
an admin should look into this.
- project_id='${rows[0].project_id}'
- path='${rows[0].path}'
- string_id='${string_id}'
- error='${error}'
`,
});
} else {
// can't even alert admins
dbg("FATAL ERROR -- blob is gone (unable to alert admins)");
throw Error("blob is gone");
}
} else {
dbg("extract blob");
const patches = JSON.parse(blob);
await callback2(db.import_patches, { patches });
}

dbg("update syncstring to indicate that patches are now available");
await pool.query("UPDATE syncstrings SET archived=NULL WHERE string_id=$1", [
string_id,
]);
if (blob != null) {
dbg("delete blob, which is now no longer needed");
await callback2(db.delete_blob, { uuid });
}
}
13 changes: 8 additions & 5 deletions src/packages/database/postgres/delete-patches.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
// This manages deleting patches. The underlying problem is that there could be a large number of patches, which stalls the DB.
// It's better to keep the number of row deletions small, to speed up the operation, only lock less rows for a shorter amount of time, etc.

import debug from "debug";
const L = debug("hub:db:delete-patches");
import { PostgreSQL } from "./types";
import getLogger from "@cocalc/backend/logger";
import type { PostgreSQL } from "./types";
import { delay } from "awaiting";

const logger = getLogger("database:delete-patches");

// max number of patches to delete at once – 10000 should take a few seconds
const MAX_AT_ONCE = parseInt(
process.env.SYNCSTRING_DELETE_MAX_AT_ONCE ?? "10000"
process.env.SYNCSTRING_DELETE_MAX_AT_ONCE ?? "10000",
);
// delay between deleting a chunk of patches
const DELAY_S = parseInt(process.env.SYNCSTRING_DELETE_DELAY_CHUNK_S ?? "1");
Expand Down Expand Up @@ -48,7 +49,9 @@ export async function delete_patches(opts: DeletePatchesOpts): Promise<void> {
while (true) {
const limit = await patchset_limit({ db, string_id });

L(`deleting patches string_id='${string_id}' until limit='${limit}'`);
logger.debug(
`deleting patches string_id='${string_id}' until limit='${limit}'`,
);
const where = { "string_id = $::CHAR(40)": string_id };
if (limit != null) {
where["time <= $::TIMESTAMP"] = limit;
Expand Down
15 changes: 15 additions & 0 deletions src/packages/database/postgres/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,21 @@ export interface PostgreSQL extends EventEmitter {
projectControl?: (project_id: string) => Project;

ensure_connection_to_project?: (project_id: string, cb?: CB) => Promise<void>;

get_blob(opts: {
uuid: string;
save_in_db?: boolean;
touch?: boolean;
cb: CB;
}): void;

import_patches(opts: { patches: string[]; string_id?: string; cb?: CB });
delete_blob(opts: { uuid: string; cb?: CB });

adminAlert?: (opts: {
subject: string;
body?: string;
}) => Promise<number | undefined>;
}

// This is an extension of BaseProject in projects/control/base.ts
Expand Down
4 changes: 4 additions & 0 deletions src/packages/server/messages/admin-alert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { getServerSettings } from "@cocalc/database/settings/server-settings";
import send from "./send";
import getAdmins from "@cocalc/server/accounts/admins";
import { getLogger } from "@cocalc/backend/logger";
import { db } from "@cocalc/database";

const logger = getLogger("server:messages:admin");

Expand Down Expand Up @@ -91,3 +92,6 @@ export default async function adminAlert({
}
}
}

// Set adminAlerts on the db singleton (which is implemented in coffeescript).
db().adminAlert = adminAlert;
13 changes: 9 additions & 4 deletions src/packages/util/db-schema/syncstring-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,16 @@ Table({
obj,
account_id,
project_id,
function (err) {
async (err) => {
if (!err) {
// only calls cb once patch is unarchived, since new sync
// rewrite doesn't use changefeed on database.
db.unarchive_patches({ string_id: obj.string_id, cb });
try {
// only calls cb once patch is unarchived, since new sync
// rewrite doesn't use changefeed on database.
await db.unarchivePatches(obj.string_id);
cb();
} catch (err2) {
cb(err2);
}
} else {
cb(err);
}
Expand Down

0 comments on commit 6528e5f

Please sign in to comment.