Skip to content

Commit

Permalink
MongoDB: Make migr8 extract and migr8 export accept --limit option
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Aug 14, 2024
1 parent a6c2f74 commit 6c32d71
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- MongoDB: Improve user interface messages
- MongoDB: Strip single leading underscore character from all top-level fields
- MongoDB: Map OID types to CrateDB TEXT columns
- MongoDB: Make `migr8 extract` and `migr8 export` accept the `--limit` option

## 2024/07/25 v0.0.16
- `ctk load table`: Added support for MongoDB Change Streams
Expand Down
8 changes: 5 additions & 3 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
logger = logging.getLogger(__name__)


def mongodb_copy(source_url, target_url, progress: bool = False):
def mongodb_copy(source_url, target_url, limit: int = 0, progress: bool = False):
"""
Synopsis
--------
Expand All @@ -36,7 +36,7 @@ def mongodb_copy(source_url, target_url, progress: bool = False):
# 1. Extract schema from MongoDB collection.
logger.info(f"Extracting schema from MongoDB: {mongodb_database}.{mongodb_collection}")
extract_args = argparse.Namespace(
url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, scan="full"
url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, scan="full", limit=limit
)
mongodb_schema = extract(extract_args)
count = mongodb_schema[mongodb_collection]["count"]
Expand Down Expand Up @@ -64,7 +64,9 @@ def mongodb_copy(source_url, target_url, progress: bool = False):
f"Transferring data from MongoDB to CrateDB: "
f"source={mongodb_collection_address.fullname}, target={cratedb_table_address.fullname}"
)
export_args = argparse.Namespace(url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection)
export_args = argparse.Namespace(
url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, limit=limit
)
buffer = export(export_args)
cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname)

Expand Down
2 changes: 2 additions & 0 deletions cratedb_toolkit/io/mongodb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def extract_parser(subargs):
choices=["full", "partial"],
help="Whether to fully scan the MongoDB collections or only partially.",
)
parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents")
parser.add_argument("-o", "--out", required=False)


Expand All @@ -42,6 +43,7 @@ def export_parser(subargs):
parser.add_argument("--host", default="localhost", help="MongoDB host")
parser.add_argument("--port", default=27017, help="MongoDB port")
parser.add_argument("--database", required=True, help="MongoDB database")
parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents")


def get_args():
Expand Down
4 changes: 2 additions & 2 deletions cratedb_toolkit/io/mongodb/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def extract(args) -> t.Dict[str, t.Any]:

schemas = {}
for collection in filtered_collections:
schemas[collection] = extract_schema_from_collection(db[collection], partial)
schemas[collection] = extract_schema_from_collection(db[collection], partial, limit=args.limit)
return schemas


Expand All @@ -121,6 +121,6 @@ def export(args) -> t.IO[bytes]:
"""
buffer = io.BytesIO()
client, db = get_mongodb_client_database(args, document_class=RawBSONDocument)
collection_to_json(db[args.collection], file=buffer)
collection_to_json(db[args.collection], fp=buffer, limit=args.limit)
buffer.seek(0)
return buffer
21 changes: 14 additions & 7 deletions cratedb_toolkit/io/mongodb/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
"""

import calendar
import sys
import typing as t

import bsonjs
Expand Down Expand Up @@ -57,6 +56,12 @@ def timestamp_converter(value):


def extract_value(value, parent_type=None):
"""
Decode MongoDB Extended JSON.
- https://www.mongodb.com/docs/manual/reference/mongodb-extended-json-v1/
- https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/
"""
if isinstance(value, dict):
if len(value) == 1:
for k, v in value.items():
Expand All @@ -73,13 +78,16 @@ def extract_value(value, parent_type=None):


def convert(d):
"""
Decode MongoDB Extended JSON, considering CrateDB specifics.
"""
newdict = {}
for k, v in sanitize_field_names(d).items():
newdict[k] = extract_value(v)
return newdict


def collection_to_json(collection: pymongo.collection.Collection, file: t.IO[t.Any] = None):
def collection_to_json(collection: pymongo.collection.Collection, fp: t.IO[t.Any], limit: int = 0):
"""
Export a MongoDB collection's documents to standard JSON.
The output is suitable to be consumed by the `cr8` program.
Expand All @@ -88,11 +96,10 @@ def collection_to_json(collection: pymongo.collection.Collection, file: t.IO[t.A
a Pymongo collection object.
file
a file-like object (stream); defaults to the current sys.stdout.
a file-like object (stream).
"""
file = file or sys.stdout.buffer
for document in collection.find():
for document in collection.find().limit(limit):
bson_json = bsonjs.dumps(document.raw)
json_object = json.loads(bson_json)
file.write(json.dumps(convert(json_object)))
file.write(b"\n")
fp.write(json.dumps(convert(json_object)))
fp.write(b"\n")
4 changes: 2 additions & 2 deletions cratedb_toolkit/io/mongodb/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
)


def extract_schema_from_collection(collection: Collection, partial: bool) -> t.Dict[str, t.Any]:
def extract_schema_from_collection(collection: Collection, partial: bool, limit: int = 0) -> t.Dict[str, t.Any]:
"""
Extract a schema definition from a collection.
Expand All @@ -100,7 +100,7 @@ def extract_schema_from_collection(collection: Collection, partial: bool) -> t.D
with progressbar:
t = progressbar.add_task(collection.name, total=count)
try:
for document in collection.find():
for document in collection.find().limit(limit=limit):
schema["count"] += 1
schema["document"] = extract_schema_from_document(document, schema["document"])
progressbar.update(t, advance=1)
Expand Down
7 changes: 6 additions & 1 deletion doc/io/mongodb/migr8.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,15 @@ invoke `export` and `cr8` to actually transfer data.

## Usage for `migr8`

The program `migr8` offers three subcommands (`extract`, `translate`, `export`),
The program `migr8` offers three subcommands `extract`, `translate`, and `export`,
to conclude data transfers from MongoDB to CrateDB. Please read this section
carefully to learn how they can be used successfully.

If you intend to evaluate `migr8` on a small portion of your data in MongoDB, the
`--limit` command-line option for the `migr8 extract` and `migr8 export`
subcommands might be useful. Using `--limit 10000` is usually both good and fast
enough, to assess if the schema translation and data transfer works well.

```shell
migr8 --version
migr8 --help
Expand Down

0 comments on commit 6c32d71

Please sign in to comment.