Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Implement msgspec encoding #2541

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

edgarrmondragon
Copy link
Collaborator

@edgarrmondragon edgarrmondragon commented Jul 17, 2024

  • Docs
  • UX. Relying on mro magic is not great, e.g. why SQLiteTap(MsgSpecWriter, SQLTap) and not SQLiteTap(SQLTap, MsgSpecWriter). A better approach might be to make the IO implementation an attribute of the Singer class.
  • Fix tests

📚 Documentation preview 📚: https://meltano-sdk--2541.org.readthedocs.build/en/2541/

Summary by Sourcery

Implement msgspec encoding for improved performance.

Enhancements:

  • Replace the default JSON encoder and decoder with msgspec for serialization and deserialization.

Tests:

  • Update tests to accommodate the msgspec implementation.

Copy link

codspeed-hq bot commented Jul 17, 2024

CodSpeed Performance Report

Merging #2541 will improve performances by ×12

Comparing edgarrmondragon/refactor/msgspec-impl-naive (815f056) with main (a8e6bf0)

Summary

⚡ 2 improvements
✅ 5 untouched benchmarks

Benchmarks breakdown

Benchmark main edgarrmondragon/refactor/msgspec-impl-naive Change
test_bench_deserialize_json 23.8 ms 5.4 ms ×4.4
test_bench_format_message 52.6 ms 4.4 ms ×12

Copy link

codecov bot commented Jul 17, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 91.35%. Comparing base (a8e6bf0) to head (815f056).
Report is 6 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2541      +/-   ##
==========================================
+ Coverage   91.29%   91.35%   +0.06%     
==========================================
  Files          62       63       +1     
  Lines        5190     5227      +37     
  Branches      669      669              
==========================================
+ Hits         4738     4775      +37     
  Misses        319      319              
  Partials      133      133              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@edgarrmondragon edgarrmondragon force-pushed the edgarrmondragon/refactor/msgspec-impl-naive branch from 97de9f7 to 1d9e947 Compare July 17, 2024 02:00
@edgarrmondragon edgarrmondragon force-pushed the edgarrmondragon/refactor/msgspec-impl-naive branch from 1d9e947 to 4febeba Compare July 17, 2024 02:59
@edgarrmondragon edgarrmondragon force-pushed the edgarrmondragon/refactor/msgspec-impl-naive branch from 4febeba to cbe10bd Compare July 17, 2024 03:00
@edgarrmondragon edgarrmondragon force-pushed the edgarrmondragon/refactor/msgspec-impl-naive branch from 8876b80 to f691f78 Compare July 25, 2024 15:45
@edgarrmondragon edgarrmondragon added this to the 0.41.0 milestone Aug 14, 2024
@BuzzCutNorman
Copy link
Contributor

I found that it helped to add a defualt_output to the for the SingerWriter to use. This allows you to make the write message a little generic.

default_output = sys.stdout.buffer

def write_message(self, message: Message) -> None:
	"""Write a message to stdout.

	Args:
		message: The message to write.
	"""
	self.default_output.write(self.format_message(message))
	self.default_output.flush()

@BuzzCutNorman
Copy link
Contributor

BuzzCutNorman commented Aug 16, 2024

In the json.py file I found to match the msgspec performance suggestions and fit into the framework you put in place I created a function for generating jsonl so I could keep the functionality of seralize_json to return strings. This way the serialize_json can be used in the connector engine creation and also in process_batch_files.

https://jcristharif.com/msgspec/perf-tips.html#line-delimited-json

json.py:

def serialize_json(obj: object, **kwargs: t.Any) -> str:
    """Serialize a dictionary into a line of json.

    Args:
        obj: A Python object usually a dict.
        **kwargs: Optional key word arguments.

    Returns:
        A string of serialized json.
    """
    return encoder.encode(obj).decode()

msg_buffer = bytearray(64)

def serialize_jsonl(obj: object, **kwargs: t.Any) -> bytes:
        """Serialize a dictionary into a line of jsonl.

        Args:
            obj: A Python object usually a dict.
            **kwargs: Optional key word arguments.

        Returns:
            A bytes of serialized json.
        """
        encoder.encode_into(obj, msg_buffer)
        msg_buffer.extend(b"\n")
        return msg_buffer

SingerWriter:

    def serialize_message(self, message: Message) -> str | bytes:
        """Serialize a dictionary into a line of json.

        Args:
            message: A Singer message object.

        Returns:
            A string of serialized json.
        """
        return serialize_jsonl(message.to_dict())

@edgarrmondragon edgarrmondragon added the Release Highlight Call this out in the release notes label Aug 22, 2024
@edgarrmondragon
Copy link
Collaborator Author

edgarrmondragon commented Sep 6, 2024

I found that it helped to add a defualt_output to the for the SingerWriter to use. This allows you to make the write message a little generic.

default_output = sys.stdout.buffer

def write_message(self, message: Message) -> None:
	"""Write a message to stdout.

	Args:
		message: The message to write.
	"""
	self.default_output.write(self.format_message(message))
	self.default_output.flush()

Do you mean in

class MsgSpecWriter(GenericSingerWriter[bytes, Message]):
"""Interface for all plugins writing Singer messages to stdout."""
def serialize_message(self, message: Message) -> bytes: # noqa: PLR6301
"""Serialize a dictionary into a line of json.
Args:
message: A Singer message object.
Returns:
A string of serialized json.
"""
return encoder.encode(message.to_dict())
def write_message(self, message: Message) -> None:
"""Write a message to stdout.
Args:
message: The message to write.
"""
sys.stdout.buffer.write(self.format_message(message) + b"\n")
sys.stdout.flush()

?

@edgarrmondragon edgarrmondragon force-pushed the edgarrmondragon/refactor/msgspec-impl-naive branch from 29dea7a to d23a8ab Compare September 6, 2024 18:56
@BuzzCutNorman
Copy link
Contributor

BuzzCutNorman commented Sep 6, 2024

Yes, that is exactly what I meant. Could have definitely been stated clearer on my part😅.

 class MsgSpecWriter(GenericSingerWriter[bytes, Message]): 
     """Interface for all plugins writing Singer messages to stdout.""" 
     
     default_output = sys.stdout.buffer
     
     def serialize_message(self, message: Message) -> bytes:  # noqa: PLR6301 
         """Serialize a dictionary into a line of json. 
  
         Args: 
             message: A Singer message object. 
  
         Returns: 
             A string of serialized json. 
         """ 
         return serialize_jsonl(message.to_dict()) 
  
     def write_message(self, message: Message) -> None: 
         """Write a message to stdout. 
  
         Args: 
             message: The message to write. 
         """ 
 	self.default_output.write(self.format_message(message))
	self.default_output.flush()

@edgarrmondragon edgarrmondragon force-pushed the edgarrmondragon/refactor/msgspec-impl-naive branch from d23a8ab to 3169b58 Compare September 6, 2024 19:15
@edgarrmondragon edgarrmondragon changed the title refactor: Implement (naive) msgspec encoding refactor: Implement msgspec encoding Sep 6, 2024
@edgarrmondragon
Copy link
Collaborator Author

Naive of me to think I could get this across in 1/2 a day of work 😅. I'll come back to this later, there's plenty of time until the planned release date.

@BuzzCutNorman
Copy link
Contributor

Like the pun 😊. Great dad joke material. Kind an inside joke now since you dropped (naive) from the title of the PR.

@edgarrmondragon
Copy link
Collaborator Author

Might have to punt this if jcrist/msgspec#711 doesn't get merged before we're ready to officially declare Python 3.13 support

@edgarrmondragon edgarrmondragon modified the milestones: v0.41.0, v0.42.0 Oct 2, 2024
@edgarrmondragon edgarrmondragon modified the milestones: v0.42.0, v0.43.0 Oct 9, 2024
@edgarrmondragon edgarrmondragon modified the milestones: v0.43, v0.44 Nov 19, 2024
@BuzzCutNorman
Copy link
Contributor

I found that it helped to add a defualt_output to the for the SingerWriter to use. This allows you to make the write message a little generic.

 class MsgSpecWriter(GenericSingerWriter[bytes, Message]): 
     """Interface for all plugins writing Singer messages to stdout.""" 
     
     default_output = sys.stdout.buffer
     
     def serialize_message(self, message: Message) -> bytes:  # noqa: PLR6301 
         """Serialize a dictionary into a line of json. 
  
         Args: 
             message: A Singer message object. 
  
         Returns: 
             A string of serialized json. 
         """ 
         return serialize_jsonl(message.to_dict()) 
  
     def write_message(self, message: Message) -> None: 
         """Write a message to stdout. 
  
         Args: 
             message: The message to write. 
         """ 
 	self.default_output.write(self.format_message(message))
	self.default_output.flush()

Well, found that making this generic by using default_output was a bad move. It is best to stick with sys.stdout or sys.stdout.buffer. I couldn't get the built-in SDK Tap tests to work until I changed back to using sys.stdout.buffer in a tap utilizing msgspec. I borrowed your use of io.TextIOWrapper(buffer=io.BytesIO()) in the test_msgspec.py file that is part of this PR and added it to singer_sdk.testing.runners.TapTestRunner._execute_sync so the redirect_stdout doesn't error because of stdout_buf = io.StringIO() lack of a .buffer to write to.

    def _execute_sync(self) -> tuple[str, str]:
        """Invoke a Tap object and return STDOUT and STDERR results in StringIO buffers.

        Returns:
            A 2-item tuple with StringIO buffers from the Tap's output: (stdout, stderr)
        """
        stdout_buf = io.TextIOWrapper(buffer=io.BytesIO())
        stderr_buf = io.TextIOWrapper(buffer=io.BytesIO())
        with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
            self.run_sync_dry_run()
        stdout_buf.seek(0)
        stderr_buf.seek(0)
        return stdout_buf.read(), stderr_buf.read()

In summary I had to change the tap's overwrite of write_message to utilize sys.stdout.buffer.write() and sys.stdout.flush(). Then changed singer_sdk.testing.runners.TapTestRunner._execute_sync to use io.TextIOWrapper(buffer=io.BytesIO()) instead of io.StringIO() for stdout_buff and stderr_buff. The Singer-SDK built in tap tests ran correctly after the change to the tap and installing a local development branch of the singer-sdk with the change to TapTestRunner._execute_sync in the virtual environment I ran the tap test from.

@edgarrmondragon edgarrmondragon force-pushed the edgarrmondragon/refactor/msgspec-impl-naive branch from 712466f to 07f348e Compare December 28, 2024 19:24
@edgarrmondragon
Copy link
Collaborator Author

Ok, the tests are passing.

Now I want to think of how to make it easy and straightforward for a developer to use msgspec as the SerDe layer, and also keep the door open to the user being the one deciding which serialization layer to use.

@edgarrmondragon edgarrmondragon self-assigned this Jan 13, 2025
@edgarrmondragon edgarrmondragon modified the milestones: v0.44, v0.45 Jan 15, 2025
@edgarrmondragon
Copy link
Collaborator Author

@sourcery-ai review

Copy link

sourcery-ai bot commented Jan 22, 2025

Reviewer's Guide by Sourcery

This pull request refactors the SDK to use msgspec for encoding and decoding Singer messages, which should improve performance. It also updates the test suite to use the new encoding.

Sequence diagram for MsgSpec message encoding flow

sequenceDiagram
    participant T as Tap
    participant MW as MsgSpecWriter
    participant SO as System Output

    T->>MW: write_message(message)
    activate MW
    MW->>MW: serialize_message(message)
    MW->>MW: format_message()
    MW->>SO: write to stdout buffer
    MW->>SO: flush stdout
    deactivate MW
Loading

Class diagram for MsgSpec encoding implementation

classDiagram
    class GenericSingerReader~T~ {
        <<abstract>>
        +default_input
        +deserialize_json(line: str)* dict
    }
    class GenericSingerWriter~T, M~ {
        <<abstract>>
        +serialize_message(message: M)* T
        +write_message(message: M) void
    }
    class MsgSpecReader {
        +default_input: sys.stdin
        +deserialize_json(line: str) dict
    }
    class MsgSpecWriter {
        +serialize_message(message: Message) bytes
        +write_message(message: Message) void
    }
    class Message {
        +to_dict() dict
    }

    GenericSingerReader <|-- MsgSpecReader
    GenericSingerWriter <|-- MsgSpecWriter
    MsgSpecWriter ..> Message

    note for MsgSpecReader "Uses msgspec for fast JSON deserialization"
    note for MsgSpecWriter "Uses msgspec for fast JSON serialization"
Loading

File-Level Changes

Change Details Files
Implemented msgspec encoding and decoding for Singer messages.
  • Added msgspec as a dependency.
  • Created MsgSpecReader and MsgSpecWriter classes.
  • Implemented enc_hook and dec_hook for handling non-native types.
  • Implemented serialize_jsonl for serializing messages.
  • Added tests for the new encoding.
singer_sdk/_singerlib/encoding/_msgspec.py
tests/_singerlib/encoding/test_msgspec.py
Updated the test suite to use the new encoding.
  • Removed the old tests for the simple encoding.
  • Added benchmarks for the new encoding.
  • Updated the test suite to use the new encoding.
  • Updated the test suite to use TextIOWrapper instead of StringIO.
tests/core/test_io.py
tests/samples/test_tap_countries.py
tests/_singerlib/encoding/test_simple.py
Updated sample taps to use the new encoding.
  • Updated the sample taps to use the new MsgSpecWriter class.
samples/sample_tap_countries/countries_tap.py
samples/sample_tap_sqlite/__init__.py
Updated noxfile to include msgspec.
  • Added msgspec to the list of extras in noxfile.py.
  • Added msgspec as a dependency in pyproject.toml.
noxfile.py
pyproject.toml

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time. You can also use
    this command to specify where the summary should be inserted.

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @edgarrmondragon - I've reviewed your changes - here's some feedback:

Overall Comments:

  • Consider making the IO implementation an attribute of the Singer class rather than using multiple inheritance, to avoid MRO ordering issues. This would provide a cleaner and more explicit design.
Here's what I looked at during the review
  • 🟢 General issues: all looks good
  • 🟡 Security: 1 issue found
  • 🟡 Testing: 2 issues found
  • 🟡 Complexity: 1 issue found
  • 🟢 Documentation: all looks good

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.


encoder = msgspec.json.Encoder(enc_hook=enc_hook, decimal_format="number")
decoder = msgspec.json.Decoder(dec_hook=dec_hook, float_hook=decimal.Decimal)
_jsonl_msg_buffer = bytearray(64)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 issue (security): The global bytearray buffer could cause thread-safety issues and the fixed size might be insufficient for larger messages

Consider using a thread-local buffer or creating a new buffer for each message. Also, the buffer size should either be dynamic or much larger to handle varying message sizes safely.

Comment on lines +13 to +16
from singer_sdk._singerlib.encoding._msgspec import (
MsgSpecReader,
MsgSpecWriter,
dec_hook,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Test coverage for encoding helper functions.

It would be beneficial to add tests specifically for enc_hook and dec_hook to ensure they handle various data types and edge cases correctly. For example, test with datetimes, decimals, and other non-native JSON types.

Suggested implementation:

import decimal
import io
from datetime import datetime, timezone
from contextlib import nullcontext, redirect_stdout
from textwrap import dedent

import pytest
from singer_sdk._singerlib.exceptions import InvalidInputLine


def test_enc_hook_datetime():
    """Test encoding datetime objects."""
    dt = datetime(2023, 1, 1, 12, 0, tzinfo=timezone.utc)
    result = enc_hook(dt)
    assert result == "2023-01-01T12:00:00+00:00"


def test_enc_hook_decimal():
    """Test encoding decimal objects."""
    dec = decimal.Decimal("123.45")
    result = enc_hook(dec)
    assert result == "123.45"


def test_enc_hook_none():
    """Test encoding None values."""
    result = enc_hook(None)
    assert result is None


def test_dec_hook_datetime():
    """Test decoding datetime strings."""
    dt_str = "2023-01-01T12:00:00+00:00"
    result = dec_hook(dt_str)
    assert isinstance(result, datetime)
    assert result == datetime(2023, 1, 1, 12, 0, tzinfo=timezone.utc)


def test_dec_hook_decimal():
    """Test decoding decimal strings."""
    dec_str = "123.45"
    result = dec_hook(dec_str)
    assert isinstance(result, decimal.Decimal)
    assert result == decimal.Decimal("123.45")


def test_dec_hook_none():
    """Test decoding None values."""
    result = dec_hook(None)
    assert result is None


def test_dec_hook_invalid_datetime():
    """Test decoding invalid datetime strings."""
    invalid_dt = "not-a-datetime"
    result = dec_hook(invalid_dt)
    assert result == invalid_dt  # Should return original string if parsing fails


def test_enc_hook_unsupported_type():
    """Test encoding unsupported type."""
    class UnsupportedType:
        pass

    obj = UnsupportedType()
    with pytest.raises(TypeError):
        enc_hook(obj)



def test_write_message():
writer = SingerWriter()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Benchmark tests should include both simple and msgspec encoding.

Now that we have two encoding methods, the benchmark tests should compare both SimpleSingerWriter and MsgSpecWriter to assess the performance impact of the change.

Suggested implementation:

def test_bench_format_message_msgspec(benchmark, bench_record_message: RecordMessage):
    """Run benchmark for message formatting using MsgSpecWriter."""
    number_of_runs = 1000

    writer = MsgSpecWriter()


def test_bench_format_message_simple(benchmark, bench_record_message: RecordMessage):
    """Run benchmark for message formatting using SimpleSingerWriter."""
    number_of_runs = 1000

    writer = SimpleSingerWriter()

You'll also need to:

  1. Import SimpleSingerWriter if not already imported
  2. Ensure bench_record_message fixture is compatible with both writers

@@ -111,7 +111,12 @@ def get_standard_target_tests(
return []


def tap_sync_test(tap: Tap) -> tuple[io.StringIO, io.StringIO]:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider creating a type alias to simplify the function signatures.

The type annotations have become overly verbose while not adding proportional value. Create a type alias to simplify the signatures while maintaining type safety:

# At the top of the file
SingerIO = io.TextIOWrapper[io.BytesIO]

# Then simplify function signatures like:
def tap_sync_test(
    tap: Tap,
) -> tuple[SingerIO, SingerIO]:
    stdout_buf = SingerIO(io.BytesIO(), encoding="utf-8")
    ...

def tap_to_target_sync_test(
    tap: Tap,
    target: Target,
) -> tuple[SingerIO, SingerIO, SingerIO, SingerIO]:
    ...

This maintains the same type safety while improving readability and making future type changes easier to maintain.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants