Skip to content

Commit

Permalink
Merge branch 'main' into update/jaccard-string
Browse files Browse the repository at this point in the history
  • Loading branch information
jstammers authored Nov 20, 2024
2 parents d6a1b39 + 6433a12 commit 52bcbbc
Show file tree
Hide file tree
Showing 32 changed files with 2,173 additions and 481 deletions.
1 change: 1 addition & 0 deletions docs/reference/block.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ becomes intractable for datasets much larger than a few thousand.
Generate pairs where records share a single key, eg "first_name"

::: mismo.block.KeyBlocker
::: mismo.block.CountsTable

## Ensemble Blockers
Blockers that use other Blockers
Expand Down
1 change: 0 additions & 1 deletion docs/reference/lib/geo.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ This contains utilities, blockers, and comparers relevant to geospatial data.
## Addresses

::: mismo.lib.geo.us_census_geocode
::: mismo.lib.geo.AddressFeatures
::: mismo.lib.geo.AddressesDimension
::: mismo.lib.geo.AddressesMatchLevel
::: mismo.lib.geo.match_level
Expand Down
9 changes: 9 additions & 0 deletions mismo/_find/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""
Still private API: The record linkage task of finding needles in a haystack, AKA lookup, search, ...
""" # noqa: E501

from __future__ import annotations

from mismo._find._find_results import FindResults as FindResults
from mismo._find._find_results import LabeledTable as LabeledTable
from mismo._find._find_results import LinkTable as LinkTable
287 changes: 287 additions & 0 deletions mismo/_find/_find_results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
from __future__ import annotations

import functools
from textwrap import dedent
from typing import TYPE_CHECKING, Literal
import warnings

import ibis
from ibis import _
from ibis.expr import types as ir

from mismo import _util

if TYPE_CHECKING:
import altair as alt


class LinkCountsTable(_util.TableWrapper):
"""A table representing the number of records by the number of matches.
eg "There were 700 records with 0 matches, 300 with 1 match, 20 with 2 matches, ..."
"""

n: ir.IntegerColumn
"""The number of records."""
n_matches: ir.IntegerColumn
"""The number of matches."""

def chart(self) -> alt.Chart:
"""A bar chart of the number of records by the number of matches."""
import altair as alt

n_title = "Number of Records"
key_title = "Number of Matches"
mins = self.order_by(_.n.desc()).limit(2).execute()
if len(mins) > 1:
subtitle = f"eg '{mins.n[0]:,} records had {mins.n_matches[0]} matches, {mins.n[1]:,} had {mins.n_matches[1]} matches, ...'" # noqa: E501
elif len(mins) == 1:
subtitle = (
f"eg '{mins.n[0]:,} records had {mins.n_matches[0]} matches', ..."
)
else:
subtitle = "eg 'there were 1000 records with 0 matches, 500 with 1 match, 100 with 2 matches, ...'" # noqa: E501
chart = (
alt.Chart(self)
.properties(
title=alt.TitleParams(
"Number of Records by Match Count",
subtitle=subtitle,
anchor="middle",
),
width=alt.Step(12) if self.count().execute() <= 20 else alt.Step(8),
)
.mark_bar()
.encode(
alt.X("n_matches:O", title=key_title, sort="x"),
alt.Y(
"n:Q",
title=n_title,
scale=alt.Scale(type="symlog"),
),
tooltip=[
alt.Tooltip("n:Q", title=n_title, format=","),
alt.Tooltip("n_matches:O", title=key_title),
],
)
)
return chart


class LabeledTable(ibis.Table):
"""
A table with a `record_ids` column added of the match(es) from the other table.
This represents one of the input tables, augmented with another column
that points to the best match(es) in the other table.
"""

record_id: ir.Column
"""The record_id of this table."""
record_ids: ir.ArrayColumn
"""The record_ids of the matches in the other table. Never NULL, if no matches, empty array.""" # noqa: E501


class LinkTable(ibis.Table):
"""
A table with a `record_id_l` and `record_id_r` column, representing matching pairs.
"""

record_id_l: ir.Column
record_id_r: ir.Column


class FindResults:
"""A Dataclass representing the results of a find operation.
For example, you have an existing database that is clean, and you want
to ingest some new, messy data.
For each of the records in the new, messy data, you want to find
the matches in the existing database.
The `haystack` is the existing database, the `needle` is the new data,
and the `links` are the pairs of matches between the two.
"""

def __init__(
self, *, haystack: ir.Table, needle: LabeledTable, links: LinkTable
) -> None:
"""Create a new FindResults object.
Parameters
----------
haystack
The table being searched. Must contain a `record_id` column.
needle
The table being searched for. Must contain a `record_id` column.
links
The pairs of matches between the needle and the haystack.
!!! warning
`record_id_l` is the record_id of the haystack,
`record_id_r` is the record_id of the needle.
"""
self._haystack = haystack
self._needle = needle
self._links = links

def haystack(self) -> ir.Table:
"""The table being searched."""
return self._haystack

def needle(self) -> ir.Table:
"""The table being searched for."""
return self._needle

def links(self) -> ir.Table:
"""The pairs of matches between the needle and the haystack."""
return self._links

def needle_labeled(self) -> LabeledTable:
"""The needle, with a `record_ids` column added of matches from the haystack."""
n = self.needle()
if "record_ids" in n.columns:
warnings.warn("Column 'record_ids' will be overwritten in needle.")
n = n.drop("record_ids")

lookup = (
self.links()
.group_by(record_id=_.record_id_r)
.agg(record_ids=_.record_id_l.collect())
)
return _util.join_lookup(n, lookup, "record_id", defaults={"record_ids": []})

def needle_labeled_none(self) -> LabeledTable:
"""The subset of needle_labeled with no matches."""
return self.needle_labeled().filter(_.record_ids.length() == 0)

def needle_labeled_single(
self, *, format: Literal["single", "array"] = "single"
) -> ir.Table:
"""The subset of needle_labeled with exactly one match.
Parameters
----------
format
- "single": Return a table with a `record_id` column, without the `record_ids` column.
- "array": Return the table as-is.
""" # noqa: E501
raw = self.needle_labeled().filter(_.record_ids.length() == 1)
if format == "single":
return raw.mutate(record_id=_.record_ids[0]).drop("record_ids")
elif format == "array":
return raw
else:
raise ValueError(f"format must be 'single' or 'array', got {format:!r}")

def needle_labeled_many(self) -> LabeledTable:
"""The subset of needle_labeled with more than one match."""
return self.needle_labeled().filter(_.record_ids.length() > 1)

def needle_match_counts(self) -> LinkCountsTable:
"""
A histogram of the number of records in the needle, binned by number of matches.
e.g. "There were 700 records with 0 matches, 300 with 1 match,
20 with 2 matches, ..."
"""
n_matches_by_record = self.links().record_id_r.value_counts(name="n")
counts = n_matches_by_record.group_by(n_matches=_.n).agg(n=_.count())
n_not_linked = (
self.needle()
.select("record_id")
.distinct()
.filter(_.record_id.notin(self.links().record_id_r))
.count()
)
extra = (
n_not_linked.name("n").as_table().mutate(n_matches=0).cast(counts.schema())
)
counts = counts.union(extra)
counts = counts.order_by(_.n_matches)
return LinkCountsTable(counts)

def haystack_labeled(self) -> LabeledTable:
"""The haystack, with a `record_ids` column added of matches from the needle."""
h = self.haystack()
if "record_ids" in h.columns:
warnings.warn("Column 'record_ids' will be overwritten in haystack.")
h = h.drop("record_ids")

lookup = (
self.links()
.group_by(record_id=_.record_id_l)
.agg(record_ids=_.record_id_r.collect())
)
return _util.join_lookup(h, lookup, "record_id", defaults={"record_ids": []})

def haystack_labeled_none(self) -> LabeledTable:
"""The subset of haystack with no matches."""
return self.haystack_labeled().filter(_.record_ids.length() == 0)

def haystack_labeled_single(
self, *, format: Literal["single", "array"] = "single"
) -> ir.Table:
"""The subset of haystack_labeled with exactly one match.
Parameters
----------
format
- "single": Return a table with a `record_id` column, without the `record_ids` column.
- "array": Return the table as-is.
""" # noqa: E501
raw = self.haystack_labeled().filter(_.record_ids.length() == 1)
if format == "single":
return raw.mutate(record_id=_.record_ids[0]).drop("record_ids")
elif format == "array":
return raw
else:
raise ValueError(f"format must be 'single' or 'array', got {format:!r}")

def haystack_labeled_many(self) -> LabeledTable:
"""The subset of haystack with more than one match."""
return self.haystack_labeled().filter(_.record_ids.length() > 1)

def haystack_match_counts(self) -> LinkCountsTable:
"""
A histogram of the records in the haystack, binned by number of matches.
e.g. "There were 700 records with 0 matches, 300 with 1 match,
20 with 2 matches, ..."
"""
n_matches_by_record = self.links().record_id_l.value_counts(name="n")
counts = n_matches_by_record.group_by(n_matches=_.n).agg(n=_.count())
n_not_linked = (
self.haystack()
.select("record_id")
.distinct()
.filter(_.record_id.notin(self.links().record_id_l))
.count()
)
extra = (
n_not_linked.name("n").as_table().mutate(n_matches=0).cast(counts.schema())
)
counts = counts.union(extra)
counts = counts.order_by(_.n_matches)
return LinkCountsTable(counts)

@functools.cache
def __str__(self) -> str:
return dedent(
f"""
FindResults(
haystack.count()={self.haystack().count().execute():,},
haystack_labeled_none().count()={self.haystack_labeled_none().count().execute():,},
haystack_labeled_single().count()={self.haystack_labeled_single().count().execute():,},
haystack_labeled_any().count()={self.haystack_labeled_single().count().execute():,},
needle.count()={self.needle().count().execute():,},
needle_labeled.count()={self.needle_labeled().count().execute():,})
needle_labeled_none.count()={self.needle_labeled_none().count().execute():,},
needle_labeled_single().count()={self.needle_labeled_single().count().execute():,},
needle_labeled_many().count()={self.needle_labeled_many().count().execute():,},
)
""".strip()
)

def __repr__(self) -> str:
return self.__str__()
46 changes: 46 additions & 0 deletions mismo/_structs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from __future__ import annotations

from ibis.expr import types as ir


def mutate(struct: ir.StructValue, **kwargs: ir.Value) -> ir.StructValue:
"""Mutate a struct by adding or replacing columns.
Analogous to ibis.Table.mutate(**kwargs).
"""
default = {field: struct[field] for field in struct.fields}
return ir.struct({**default, **kwargs})


def drop(struct: ir.StructValue, *fields: str) -> ir.StructValue:
"""Mutate a struct by dropping columns.
Analogous to ibis.Table.drop(fields)."""
new_fields = {
field: struct[field] for field in struct.fields if field not in fields
}
return ir.struct(new_fields)


def select(struct: ir.Column, *fields: str) -> ir.Column:
"""Select columns from a struct.
Analogous to ibis.Table.select(fields)."""
return ir.struct({field: struct[field] for field in fields})


def rename(struct: ir.StructValue, **renamings: str) -> ir.StructValue:
"""Rename the fields in a struct, analogous to Table.rename()"""
fields = {field: struct[field] for field in struct.fields}
for new, old in renamings.items():
fields[new] = fields.pop(old)
return ir.struct(fields)


def unpack(struct: ir.StructValue) -> tuple[ir.Value, ...]:
"""Unpack the values of a struct into a tuple.
Replacement for the deprecated struct.destructure() method,
and analogous to the Table.unpack("my_struct_col") method.
"""
return (struct[field_name].name(field_name) for field_name in struct.type().names)
Loading

0 comments on commit 52bcbbc

Please sign in to comment.