diff --git a/CHANGES.md b/CHANGES.md index 318fa3b..cfe61d6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -12,6 +12,7 @@ - MongoDB: Improve user interface messages - MongoDB: Strip single leading underscore character from all top-level fields - MongoDB: Map OID types to CrateDB TEXT columns +- MongoDB: Make `migr8 extract` and `migr8 export` accept the `--limit` option ## 2024/07/25 v0.0.16 - `ctk load table`: Added support for MongoDB Change Streams diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index a8f73bf..cd22ad4 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -10,7 +10,7 @@ logger = logging.getLogger(__name__) -def mongodb_copy(source_url, target_url, progress: bool = False): +def mongodb_copy(source_url, target_url, limit: int = 0, progress: bool = False): """ Synopsis -------- @@ -36,7 +36,7 @@ def mongodb_copy(source_url, target_url, progress: bool = False): # 1. Extract schema from MongoDB collection. logger.info(f"Extracting schema from MongoDB: {mongodb_database}.{mongodb_collection}") extract_args = argparse.Namespace( - url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, scan="full" + url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, scan="full", limit=limit ) mongodb_schema = extract(extract_args) count = mongodb_schema[mongodb_collection]["count"] @@ -64,7 +64,9 @@ def mongodb_copy(source_url, target_url, progress: bool = False): f"Transferring data from MongoDB to CrateDB: " f"source={mongodb_collection_address.fullname}, target={cratedb_table_address.fullname}" ) - export_args = argparse.Namespace(url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection) + export_args = argparse.Namespace( + url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, limit=limit + ) buffer = export(export_args) cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname) diff --git a/cratedb_toolkit/io/mongodb/cli.py b/cratedb_toolkit/io/mongodb/cli.py index fd3ba01..563af81 100644 --- a/cratedb_toolkit/io/mongodb/cli.py +++ b/cratedb_toolkit/io/mongodb/cli.py @@ -24,6 +24,7 @@ def extract_parser(subargs): choices=["full", "partial"], help="Whether to fully scan the MongoDB collections or only partially.", ) + parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents") parser.add_argument("-o", "--out", required=False) @@ -42,6 +43,7 @@ def export_parser(subargs): parser.add_argument("--host", default="localhost", help="MongoDB host") parser.add_argument("--port", default=27017, help="MongoDB port") parser.add_argument("--database", required=True, help="MongoDB database") + parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents") def get_args(): diff --git a/cratedb_toolkit/io/mongodb/core.py b/cratedb_toolkit/io/mongodb/core.py index 1e7561b..fae1f8a 100644 --- a/cratedb_toolkit/io/mongodb/core.py +++ b/cratedb_toolkit/io/mongodb/core.py @@ -95,7 +95,7 @@ def extract(args) -> t.Dict[str, t.Any]: schemas = {} for collection in filtered_collections: - schemas[collection] = extract_schema_from_collection(db[collection], partial) + schemas[collection] = extract_schema_from_collection(db[collection], partial, limit=args.limit) return schemas @@ -121,6 +121,6 @@ def export(args) -> t.IO[bytes]: """ buffer = io.BytesIO() client, db = get_mongodb_client_database(args, document_class=RawBSONDocument) - collection_to_json(db[args.collection], file=buffer) + collection_to_json(db[args.collection], fp=buffer, limit=args.limit) buffer.seek(0) return buffer diff --git a/cratedb_toolkit/io/mongodb/export.py b/cratedb_toolkit/io/mongodb/export.py index 23eb45d..c4ca025 100644 --- a/cratedb_toolkit/io/mongodb/export.py +++ b/cratedb_toolkit/io/mongodb/export.py @@ -25,7 +25,6 @@ """ import calendar -import sys import typing as t import bsonjs @@ -57,6 +56,12 @@ def timestamp_converter(value): def extract_value(value, parent_type=None): + """ + Decode MongoDB Extended JSON. + + - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json-v1/ + - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ + """ if isinstance(value, dict): if len(value) == 1: for k, v in value.items(): @@ -73,13 +78,16 @@ def extract_value(value, parent_type=None): def convert(d): + """ + Decode MongoDB Extended JSON, considering CrateDB specifics. + """ newdict = {} for k, v in sanitize_field_names(d).items(): newdict[k] = extract_value(v) return newdict -def collection_to_json(collection: pymongo.collection.Collection, file: t.IO[t.Any] = None): +def collection_to_json(collection: pymongo.collection.Collection, fp: t.IO[t.Any], limit: int = 0): """ Export a MongoDB collection's documents to standard JSON. The output is suitable to be consumed by the `cr8` program. @@ -88,11 +96,10 @@ def collection_to_json(collection: pymongo.collection.Collection, file: t.IO[t.A a Pymongo collection object. file - a file-like object (stream); defaults to the current sys.stdout. + a file-like object (stream). """ - file = file or sys.stdout.buffer - for document in collection.find(): + for document in collection.find().limit(limit): bson_json = bsonjs.dumps(document.raw) json_object = json.loads(bson_json) - file.write(json.dumps(convert(json_object))) - file.write(b"\n") + fp.write(json.dumps(convert(json_object))) + fp.write(b"\n") diff --git a/cratedb_toolkit/io/mongodb/extract.py b/cratedb_toolkit/io/mongodb/extract.py index aa8a258..e11b99c 100644 --- a/cratedb_toolkit/io/mongodb/extract.py +++ b/cratedb_toolkit/io/mongodb/extract.py @@ -84,7 +84,7 @@ ) -def extract_schema_from_collection(collection: Collection, partial: bool) -> t.Dict[str, t.Any]: +def extract_schema_from_collection(collection: Collection, partial: bool, limit: int = 0) -> t.Dict[str, t.Any]: """ Extract a schema definition from a collection. @@ -100,7 +100,7 @@ def extract_schema_from_collection(collection: Collection, partial: bool) -> t.D with progressbar: t = progressbar.add_task(collection.name, total=count) try: - for document in collection.find(): + for document in collection.find().limit(limit=limit): schema["count"] += 1 schema["document"] = extract_schema_from_document(document, schema["document"]) progressbar.update(t, advance=1) diff --git a/doc/io/mongodb/migr8.md b/doc/io/mongodb/migr8.md index e6e8d24..65b19e1 100644 --- a/doc/io/mongodb/migr8.md +++ b/doc/io/mongodb/migr8.md @@ -66,10 +66,15 @@ invoke `export` and `cr8` to actually transfer data. ## Usage for `migr8` -The program `migr8` offers three subcommands (`extract`, `translate`, `export`), +The program `migr8` offers three subcommands `extract`, `translate`, and `export`, to conclude data transfers from MongoDB to CrateDB. Please read this section carefully to learn how they can be used successfully. +If you intend to evaluate `migr8` on a small portion of your data in MongoDB, the +`--limit` command-line option for the `migr8 extract` and `migr8 export` +subcommands might be useful. Using `--limit 10000` is usually both good and fast +enough, to assess if the schema translation and data transfer works well. + ```shell migr8 --version migr8 --help