Skip to content

entie package

entie

Public API for entie: MongoDB helpers built on entei-core.

entei-core provides collection-to-columnar materialization; entie adds PyMongo connection helpers, bulk row inserts, a small lazy DataFrame-style API, and expression helpers. Pure Python on top of PyMongo (no native stack).

EnteiDataFrame

Lazy view over a collection; filters and projection run in Python on :meth:collect.

Reads the collection once via :func:entei_core.mongo_root_to_column_dict (full find()), then applies filter_rows predicates and select column order. Not a streaming or server-side aggregation API.

Source code in packages/entie/src/entie/dataframe.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
class EnteiDataFrame:
    """Lazy view over a collection; filters and projection run in Python on :meth:`collect`.

    Reads the collection once via :func:`entei_core.mongo_root_to_column_dict`
    (full ``find()``), then applies ``filter_rows`` predicates and ``select``
    column order. Not a streaming or server-side aggregation API.
    """

    __slots__ = ("_collection", "_fields", "_filters", "_projection")

    def __init__(
        self,
        collection: Any,
        *,
        fields: tuple[str, ...] | None = None,
        filters: tuple[Callable[[dict[str, Any]], bool], ...] = (),
        projection: tuple[str, ...] | None = None,
    ) -> None:
        """Use :meth:`from_collection` to construct; constructor is for chaining.

        Parameters
        ----------
        collection:
            PyMongo collection (or compatible) scanned on ``collect``.
        fields:
            Column names passed to :class:`~entei_core.mongo_root.MongoRoot`; ``None``
            means infer keys from documents.
        filters:
            Predicates applied in order to row dicts after materialization.
        projection:
            If set, output columns are restricted to these names (after filters).
        """
        self._collection = collection
        self._fields = fields
        self._filters = filters
        self._projection = projection

    @classmethod
    def from_collection(
        cls,
        collection: Any,
        *,
        fields: Sequence[str] | None = None,
    ) -> EnteiDataFrame:
        """Build a frame from a PyMongo (or mongomock) collection.

        Parameters
        ----------
        collection:
            Collection whose documents are read when :meth:`collect` runs.
        fields:
            Ordered top-level field names. If ``None``, keys are inferred from
            documents (union, sorted). If ``()`` or ``[]`` (normalized to ``()``),
            no columns are materialized. Names must be unique.

        Returns
        -------
        EnteiDataFrame
            Lazy frame; call :meth:`collect` to load data.

        See Also
        --------
        entei_core.mongo_root.MongoRoot : Semantics of ``fields`` and empty collections.
        """
        fk = tuple(fields) if fields is not None else None
        return cls(collection, fields=fk)

    def select(self, *columns: str) -> EnteiDataFrame:
        """Keep only the given output columns (applied after ``filter_rows``).

        Parameters
        ----------
        *columns:
            One or more distinct field names.

        Returns
        -------
        EnteiDataFrame
            New frame with projection set.

        Raises
        ------
        ValueError
            If no columns, or if any name is duplicated.
        """
        if not columns:
            raise ValueError("select() requires at least one column name")
        if len(columns) != len(set(columns)):
            raise ValueError("select() column names must be unique")
        return EnteiDataFrame(
            self._collection,
            fields=self._fields,
            filters=self._filters,
            projection=tuple(columns),
        )

    def filter_rows(self, predicate: Callable[[dict[str, Any]], bool]) -> EnteiDataFrame:
        """Return a frame that keeps rows where ``predicate(row)`` is true.

        Parameters
        ----------
        predicate:
            Called with each row as a ``dict[str, Any]`` (top-level fields).

        Returns
        -------
        EnteiDataFrame
            New frame with ``predicate`` appended to the filter chain.
        """
        return EnteiDataFrame(
            self._collection,
            fields=self._fields,
            filters=self._filters + (predicate,),
            projection=self._projection,
        )

    @overload
    def collect(self, *, as_lists: Literal[True] = True) -> dict[str, list[Any]]: ...

    @overload
    def collect(self, *, as_lists: Literal[False]) -> list[dict[str, Any]]: ...

    def collect(
        self,
        *,
        as_lists: bool = True,
    ) -> dict[str, list[Any]] | list[dict[str, Any]]:
        """Materialize: scan collection, apply filters, then optional projection.

        Parameters
        ----------
        as_lists:
            If ``True`` (default), return ``dict[str, list]`` columnar data. If
            ``False``, return ``list[dict]`` rows.

        Returns
        -------
        dict[str, list] or list[dict]
            Columnar or row-oriented result consistent with ``as_lists``.
        """
        root = MongoRoot(self._collection, fields=self._fields)
        cols = mongo_root_to_column_dict(root)
        rows = _columns_to_rows(cols)
        for pred in self._filters:
            rows = [r for r in rows if pred(r)]
        proj = self._projection
        if proj is not None:
            rows = [{k: r.get(k) for k in proj} for r in rows]
            keys = list(proj)
        else:
            keys = list(cols.keys())
        if as_lists:
            return _rows_to_columns(rows, keys)
        return rows

__init__(collection, *, fields=None, filters=(), projection=None)

Use :meth:from_collection to construct; constructor is for chaining.

Parameters:

Name Type Description Default
collection Any

PyMongo collection (or compatible) scanned on collect.

required
fields tuple[str, ...] | None

Column names passed to :class:~entei_core.mongo_root.MongoRoot; None means infer keys from documents.

None
filters tuple[Callable[[dict[str, Any]], bool], ...]

Predicates applied in order to row dicts after materialization.

()
projection tuple[str, ...] | None

If set, output columns are restricted to these names (after filters).

None
Source code in packages/entie/src/entie/dataframe.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(
    self,
    collection: Any,
    *,
    fields: tuple[str, ...] | None = None,
    filters: tuple[Callable[[dict[str, Any]], bool], ...] = (),
    projection: tuple[str, ...] | None = None,
) -> None:
    """Use :meth:`from_collection` to construct; constructor is for chaining.

    Parameters
    ----------
    collection:
        PyMongo collection (or compatible) scanned on ``collect``.
    fields:
        Column names passed to :class:`~entei_core.mongo_root.MongoRoot`; ``None``
        means infer keys from documents.
    filters:
        Predicates applied in order to row dicts after materialization.
    projection:
        If set, output columns are restricted to these names (after filters).
    """
    self._collection = collection
    self._fields = fields
    self._filters = filters
    self._projection = projection

collect(*, as_lists=True)

collect(*, as_lists: Literal[True] = True) -> dict[str, list[Any]]
collect(*, as_lists: Literal[False]) -> list[dict[str, Any]]

Materialize: scan collection, apply filters, then optional projection.

Parameters:

Name Type Description Default
as_lists bool

If True (default), return dict[str, list] columnar data. If False, return list[dict] rows.

True

Returns:

Type Description
dict[str, list] or list[dict]

Columnar or row-oriented result consistent with as_lists.

Source code in packages/entie/src/entie/dataframe.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def collect(
    self,
    *,
    as_lists: bool = True,
) -> dict[str, list[Any]] | list[dict[str, Any]]:
    """Materialize: scan collection, apply filters, then optional projection.

    Parameters
    ----------
    as_lists:
        If ``True`` (default), return ``dict[str, list]`` columnar data. If
        ``False``, return ``list[dict]`` rows.

    Returns
    -------
    dict[str, list] or list[dict]
        Columnar or row-oriented result consistent with ``as_lists``.
    """
    root = MongoRoot(self._collection, fields=self._fields)
    cols = mongo_root_to_column_dict(root)
    rows = _columns_to_rows(cols)
    for pred in self._filters:
        rows = [r for r in rows if pred(r)]
    proj = self._projection
    if proj is not None:
        rows = [{k: r.get(k) for k in proj} for r in rows]
        keys = list(proj)
    else:
        keys = list(cols.keys())
    if as_lists:
        return _rows_to_columns(rows, keys)
    return rows

filter_rows(predicate)

Return a frame that keeps rows where predicate(row) is true.

Parameters:

Name Type Description Default
predicate Callable[[dict[str, Any]], bool]

Called with each row as a dict[str, Any] (top-level fields).

required

Returns:

Type Description
EnteiDataFrame

New frame with predicate appended to the filter chain.

Source code in packages/entie/src/entie/dataframe.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def filter_rows(self, predicate: Callable[[dict[str, Any]], bool]) -> EnteiDataFrame:
    """Return a frame that keeps rows where ``predicate(row)`` is true.

    Parameters
    ----------
    predicate:
        Called with each row as a ``dict[str, Any]`` (top-level fields).

    Returns
    -------
    EnteiDataFrame
        New frame with ``predicate`` appended to the filter chain.
    """
    return EnteiDataFrame(
        self._collection,
        fields=self._fields,
        filters=self._filters + (predicate,),
        projection=self._projection,
    )

from_collection(collection, *, fields=None) classmethod

Build a frame from a PyMongo (or mongomock) collection.

Parameters:

Name Type Description Default
collection Any

Collection whose documents are read when :meth:collect runs.

required
fields Sequence[str] | None

Ordered top-level field names. If None, keys are inferred from documents (union, sorted). If () or [] (normalized to ()), no columns are materialized. Names must be unique.

None

Returns:

Type Description
EnteiDataFrame

Lazy frame; call :meth:collect to load data.

See Also

entei_core.mongo_root.MongoRoot : Semantics of fields and empty collections.

Source code in packages/entie/src/entie/dataframe.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@classmethod
def from_collection(
    cls,
    collection: Any,
    *,
    fields: Sequence[str] | None = None,
) -> EnteiDataFrame:
    """Build a frame from a PyMongo (or mongomock) collection.

    Parameters
    ----------
    collection:
        Collection whose documents are read when :meth:`collect` runs.
    fields:
        Ordered top-level field names. If ``None``, keys are inferred from
        documents (union, sorted). If ``()`` or ``[]`` (normalized to ``()``),
        no columns are materialized. Names must be unique.

    Returns
    -------
    EnteiDataFrame
        Lazy frame; call :meth:`collect` to load data.

    See Also
    --------
    entei_core.mongo_root.MongoRoot : Semantics of ``fields`` and empty collections.
    """
    fk = tuple(fields) if fields is not None else None
    return cls(collection, fields=fk)

select(*columns)

Keep only the given output columns (applied after filter_rows).

Parameters:

Name Type Description Default
*columns str

One or more distinct field names.

()

Returns:

Type Description
EnteiDataFrame

New frame with projection set.

Raises:

Type Description
ValueError

If no columns, or if any name is duplicated.

Source code in packages/entie/src/entie/dataframe.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def select(self, *columns: str) -> EnteiDataFrame:
    """Keep only the given output columns (applied after ``filter_rows``).

    Parameters
    ----------
    *columns:
        One or more distinct field names.

    Returns
    -------
    EnteiDataFrame
        New frame with projection set.

    Raises
    ------
    ValueError
        If no columns, or if any name is duplicated.
    """
    if not columns:
        raise ValueError("select() requires at least one column name")
    if len(columns) != len(set(columns)):
        raise ValueError("select() column names must be unique")
    return EnteiDataFrame(
        self._collection,
        fields=self._fields,
        filters=self._filters,
        projection=tuple(columns),
    )

EntieDatabase

Handle for a single MongoDB database (PyMongo Database).

Must wrap a PyMongo :class:~pymongo.database.Database, not a :class:~pymongo.collection.Collection. Use :meth:collection / :meth:table to obtain collections by name.

Source code in packages/entie/src/entie/client.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class EntieDatabase:
    """Handle for a single MongoDB database (PyMongo ``Database``).

    Must wrap a PyMongo :class:`~pymongo.database.Database`, not a
    :class:`~pymongo.collection.Collection`. Use :meth:`collection` / :meth:`table`
    to obtain collections by name.
    """

    __slots__ = ("_db",)

    def __init__(self, db: Any) -> None:
        """Parameters
        ----------
        db:
            PyMongo :class:`~pymongo.database.Database` (e.g. ``client["app"]``).
        """
        self._db = db

    @property
    def raw(self) -> Any:
        """The underlying PyMongo :class:`~pymongo.database.Database`."""
        return self._db

    def collection(self, name: str) -> Any:
        """Return the named :class:`~pymongo.collection.Collection`."""
        return self._db[name]

    def table(self, name: str) -> Any:
        """Alias for :meth:`collection` (collection-as-table naming)."""
        return self.collection(name)

    def list_collection_names(self) -> list[str]:
        """List collection names in this database (see PyMongo ``list_collection_names``)."""
        return list(self._db.list_collection_names())

    def tables(self) -> list[str]:
        """Same as :meth:`list_collection_names`."""
        return self.list_collection_names()

raw property

The underlying PyMongo :class:~pymongo.database.Database.

__init__(db)

Parameters:

Name Type Description Default
db Any

PyMongo :class:~pymongo.database.Database (e.g. client["app"]).

required
Source code in packages/entie/src/entie/client.py
80
81
82
83
84
85
86
def __init__(self, db: Any) -> None:
    """Parameters
    ----------
    db:
        PyMongo :class:`~pymongo.database.Database` (e.g. ``client["app"]``).
    """
    self._db = db

collection(name)

Return the named :class:~pymongo.collection.Collection.

Source code in packages/entie/src/entie/client.py
93
94
95
def collection(self, name: str) -> Any:
    """Return the named :class:`~pymongo.collection.Collection`."""
    return self._db[name]

list_collection_names()

List collection names in this database (see PyMongo list_collection_names).

Source code in packages/entie/src/entie/client.py
101
102
103
def list_collection_names(self) -> list[str]:
    """List collection names in this database (see PyMongo ``list_collection_names``)."""
    return list(self._db.list_collection_names())

table(name)

Alias for :meth:collection (collection-as-table naming).

Source code in packages/entie/src/entie/client.py
97
98
99
def table(self, name: str) -> Any:
    """Alias for :meth:`collection` (collection-as-table naming)."""
    return self.collection(name)

tables()

Same as :meth:list_collection_names.

Source code in packages/entie/src/entie/client.py
105
106
107
def tables(self) -> list[str]:
    """Same as :meth:`list_collection_names`."""
    return self.list_collection_names()

EntieMongoClient

Thin wrapper around :class:pymongo.mongo_client.MongoClient.

Use :meth:database to get an :class:EntieDatabase, then :meth:EntieDatabase.collection or :meth:EntieDatabase.table for :meth:EnteiDataFrame.from_collection and inserts.

Source code in packages/entie/src/entie/client.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class EntieMongoClient:
    """Thin wrapper around :class:`pymongo.mongo_client.MongoClient`.

    Use :meth:`database` to get an :class:`EntieDatabase`, then
    :meth:`EntieDatabase.collection` or :meth:`EntieDatabase.table` for
    :meth:`EnteiDataFrame.from_collection` and inserts.
    """

    __slots__ = ("_client",)

    def __init__(self, client: MongoClient[Any]) -> None:
        """Wrap an existing PyMongo client.

        Parameters
        ----------
        client:
            Connected :class:`~pymongo.mongo_client.MongoClient` instance.
        """
        self._client = client

    @property
    def raw(self) -> MongoClient[Any]:
        """The underlying :class:`~pymongo.mongo_client.MongoClient`."""
        return self._client

    def database(self, name: str, *, codec_options: Any | None = None) -> EntieDatabase:
        """Return a database by name.

        Parameters
        ----------
        name:
            MongoDB database name.
        codec_options:
            Optional BSON :class:`~bson.codec_options.CodecOptions` for this database.

        Returns
        -------
        EntieDatabase
            Wrapper around ``client[name]`` or ``get_database(...)``.
        """
        if codec_options is not None:
            return EntieDatabase(self._client.get_database(name, codec_options=codec_options))
        return EntieDatabase(self._client[name])

    def close(self) -> None:
        """Close the underlying PyMongo client (releases sockets)."""
        self._client.close()

    def __enter__(self) -> EntieMongoClient:
        """Enter context: returns ``self`` (caller should ``close()`` on exit)."""
        return self

    def __exit__(self, *_args: object) -> None:
        """Exit context: calls :meth:`close`."""
        self.close()

raw property

The underlying :class:~pymongo.mongo_client.MongoClient.

__enter__()

Enter context: returns self (caller should close() on exit).

Source code in packages/entie/src/entie/client.py
61
62
63
def __enter__(self) -> EntieMongoClient:
    """Enter context: returns ``self`` (caller should ``close()`` on exit)."""
    return self

__exit__(*_args)

Exit context: calls :meth:close.

Source code in packages/entie/src/entie/client.py
65
66
67
def __exit__(self, *_args: object) -> None:
    """Exit context: calls :meth:`close`."""
    self.close()

__init__(client)

Wrap an existing PyMongo client.

Parameters:

Name Type Description Default
client MongoClient[Any]

Connected :class:~pymongo.mongo_client.MongoClient instance.

required
Source code in packages/entie/src/entie/client.py
23
24
25
26
27
28
29
30
31
def __init__(self, client: MongoClient[Any]) -> None:
    """Wrap an existing PyMongo client.

    Parameters
    ----------
    client:
        Connected :class:`~pymongo.mongo_client.MongoClient` instance.
    """
    self._client = client

close()

Close the underlying PyMongo client (releases sockets).

Source code in packages/entie/src/entie/client.py
57
58
59
def close(self) -> None:
    """Close the underlying PyMongo client (releases sockets)."""
    self._client.close()

database(name, *, codec_options=None)

Return a database by name.

Parameters:

Name Type Description Default
name str

MongoDB database name.

required
codec_options Any | None

Optional BSON :class:~bson.codec_options.CodecOptions for this database.

None

Returns:

Type Description
EntieDatabase

Wrapper around client[name] or get_database(...).

Source code in packages/entie/src/entie/client.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def database(self, name: str, *, codec_options: Any | None = None) -> EntieDatabase:
    """Return a database by name.

    Parameters
    ----------
    name:
        MongoDB database name.
    codec_options:
        Optional BSON :class:`~bson.codec_options.CodecOptions` for this database.

    Returns
    -------
    EntieDatabase
        Wrapper around ``client[name]`` or ``get_database(...)``.
    """
    if codec_options is not None:
        return EntieDatabase(self._client.get_database(name, codec_options=codec_options))
    return EntieDatabase(self._client[name])

MongoRoot dataclass

Carrier for a collection plus optional fixed column list for materialization.

Used with :func:~entei_core.mongo_root_to_column_dict to produce dict[str, list] with one list per top-level field.

Parameters:

Name Type Description Default
collection Any

A PyMongo :class:~pymongo.collection.Collection or compatible (e.g. mongomock).

required
fields tuple[str, ...] | None

Column order and membership for output. If None, field names are the union of top-level keys in all documents (sorted). If () (empty tuple), no columns are emitted even when documents exist. If non-empty, names must be unique. For an empty collection with fields is None, the result has no columns.

None

Raises:

Type Description
ValueError

If fields contains duplicate names.

Source code in packages/entei-core/src/entei_core/mongo_root.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@dataclass(frozen=True, slots=True)
class MongoRoot:
    """Carrier for a collection plus optional fixed column list for materialization.

    Used with :func:`~entei_core.mongo_root_to_column_dict` to produce
    ``dict[str, list]`` with one list per top-level field.

    Parameters
    ----------
    collection:
        A PyMongo :class:`~pymongo.collection.Collection` or compatible (e.g. mongomock).
    fields:
        Column order and membership for output. If ``None``, field names are the
        union of top-level keys in all documents (sorted). If ``()`` (empty tuple),
        no columns are emitted even when documents exist. If non-empty, names must
        be unique. For an **empty** collection with ``fields is None``, the result
        has no columns.

    Raises
    ------
    ValueError
        If ``fields`` contains duplicate names.
    """

    collection: Any
    fields: tuple[str, ...] | None = None

    def __post_init__(self) -> None:
        """Validate ``fields`` invariants."""
        if self.fields is not None and len(self.fields) != len(set(self.fields)):
            raise ValueError("fields must not contain duplicate names")

__post_init__()

Validate fields invariants.

Source code in packages/entei-core/src/entei_core/mongo_root.py
36
37
38
39
def __post_init__(self) -> None:
    """Validate ``fields`` invariants."""
    if self.fields is not None and len(self.fields) != len(set(self.fields)):
        raise ValueError("fields must not contain duplicate names")

Records

Rows staged for insertion into a MongoDB collection via PyMongo insert_many.

Source code in packages/entie/src/entie/io/records.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class Records:
    """Rows staged for insertion into a MongoDB collection via PyMongo ``insert_many``."""

    __slots__ = ("_database", "_rows")

    def __init__(self, rows: list[dict[str, Any]], *, database: EntieDatabase) -> None:
        """Parameters
        ----------
        rows:
            BSON-compatible document dicts to insert.
        database:
            Target database; collection is chosen in :meth:`insert_into`.
        """
        self._rows = rows
        self._database = database

    @classmethod
    def from_list(
        cls,
        rows: list[dict[str, Any]],
        *,
        database: EntieDatabase,
    ) -> Records:
        """Copy ``rows`` into a new list and wrap with ``database``.

        Parameters
        ----------
        rows:
            Documents to insert (shallow-copied list; dicts are not deep-copied).
        database:
            :class:`~entie.client.EntieDatabase` backing the target collections.

        Returns
        -------
        Records
            Call :meth:`insert_into` to perform the insert.
        """
        return cls(list(rows), database=database)

    def insert_into(self, table: str) -> Any:
        """Insert all rows into the named collection.

        Parameters
        ----------
        table:
            Collection name on ``database`` (MongoDB collection, not SQL table).

        Returns
        -------
        InsertManyResult or None
            PyMongo ``insert_many`` result, or ``None`` if there are zero rows.
        """
        coll = self._database.collection(table)
        if not self._rows:
            return None
        return coll.insert_many(self._rows)

__init__(rows, *, database)

Parameters:

Name Type Description Default
rows list[dict[str, Any]]

BSON-compatible document dicts to insert.

required
database EntieDatabase

Target database; collection is chosen in :meth:insert_into.

required
Source code in packages/entie/src/entie/io/records.py
15
16
17
18
19
20
21
22
23
24
def __init__(self, rows: list[dict[str, Any]], *, database: EntieDatabase) -> None:
    """Parameters
    ----------
    rows:
        BSON-compatible document dicts to insert.
    database:
        Target database; collection is chosen in :meth:`insert_into`.
    """
    self._rows = rows
    self._database = database

from_list(rows, *, database) classmethod

Copy rows into a new list and wrap with database.

Parameters:

Name Type Description Default
rows list[dict[str, Any]]

Documents to insert (shallow-copied list; dicts are not deep-copied).

required
database EntieDatabase

:class:~entie.client.EntieDatabase backing the target collections.

required

Returns:

Type Description
Records

Call :meth:insert_into to perform the insert.

Source code in packages/entie/src/entie/io/records.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@classmethod
def from_list(
    cls,
    rows: list[dict[str, Any]],
    *,
    database: EntieDatabase,
) -> Records:
    """Copy ``rows`` into a new list and wrap with ``database``.

    Parameters
    ----------
    rows:
        Documents to insert (shallow-copied list; dicts are not deep-copied).
    database:
        :class:`~entie.client.EntieDatabase` backing the target collections.

    Returns
    -------
    Records
        Call :meth:`insert_into` to perform the insert.
    """
    return cls(list(rows), database=database)

insert_into(table)

Insert all rows into the named collection.

Parameters:

Name Type Description Default
table str

Collection name on database (MongoDB collection, not SQL table).

required

Returns:

Type Description
InsertManyResult or None

PyMongo insert_many result, or None if there are zero rows.

Source code in packages/entie/src/entie/io/records.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def insert_into(self, table: str) -> Any:
    """Insert all rows into the named collection.

    Parameters
    ----------
    table:
        Collection name on ``database`` (MongoDB collection, not SQL table).

    Returns
    -------
    InsertManyResult or None
        PyMongo ``insert_many`` result, or ``None`` if there are zero rows.
    """
    coll = self._database.collection(table)
    if not self._rows:
        return None
    return coll.insert_many(self._rows)

col(name)

Return name unchanged (readable select(col("x"))-style spelling).

Parameters:

Name Type Description Default
name str

Column / field name.

required

Returns:

Type Description
str

The same string name.

Source code in packages/entie/src/entie/expressions/__init__.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def col(name: str) -> str:
    """Return ``name`` unchanged (readable ``select(col("x"))``-style spelling).

    Parameters
    ----------
    name:
        Column / field name.

    Returns
    -------
    str
        The same string ``name``.
    """
    return name

column(name)

Alias of :func:col.

Parameters:

Name Type Description Default
name str

Column / field name.

required

Returns:

Type Description
str

The same string name.

Source code in packages/entie/src/entie/expressions/__init__.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def column(name: str) -> str:
    """Alias of :func:`col`.

    Parameters
    ----------
    name:
        Column / field name.

    Returns
    -------
    str
        The same string ``name``.
    """
    return name

connect(uri=None, *, database=None, client=None, **client_kwargs)

connect(uri: str | None = None, *, database: str, client: MongoClient[Any] | None = None, **client_kwargs: Any) -> EntieDatabase
connect(uri: str | None = None, *, database: None = None, client: MongoClient[Any] | None = None, **client_kwargs: Any) -> EntieMongoClient

Open a MongoDB connection or wrap an existing client.

If client is omitted, builds a new :class:~pymongo.mongo_client.MongoClient from uri, or from the ENTIE_URI environment variable when uri is omitted.

Parameters:

Name Type Description Default
uri str | None

MongoDB connection URI. Ignored when client is passed. If omitted and client is omitted, ENTIE_URI is read.

None
database str | None

If set, returns :class:EntieDatabase for that database; otherwise returns :class:EntieMongoClient.

None
client MongoClient[Any] | None

Existing client to wrap. Do not pass uri at the same time.

None
**client_kwargs Any

Forwarded to :class:~pymongo.mongo_client.MongoClient only when a new client is constructed (client is None).

{}

Returns:

Type Description
EntieMongoClient or EntieDatabase

Client wrapper, or database handle when database is not None.

Raises:

Type Description
ValueError

If neither a resolvable URI nor client is given; or if both uri and client are passed.

Source code in packages/entie/src/entie/client.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def connect(
    uri: str | None = None,
    *,
    database: str | None = None,
    client: MongoClient[Any] | None = None,
    **client_kwargs: Any,
) -> EntieMongoClient | EntieDatabase:
    """Open a MongoDB connection or wrap an existing client.

    If ``client`` is omitted, builds a new :class:`~pymongo.mongo_client.MongoClient`
    from ``uri``, or from the ``ENTIE_URI`` environment variable when ``uri`` is omitted.

    Parameters
    ----------
    uri:
        MongoDB connection URI. Ignored when ``client`` is passed. If omitted and
        ``client`` is omitted, ``ENTIE_URI`` is read.
    database:
        If set, returns :class:`EntieDatabase` for that database; otherwise returns
        :class:`EntieMongoClient`.
    client:
        Existing client to wrap. Do not pass ``uri`` at the same time.
    **client_kwargs:
        Forwarded to :class:`~pymongo.mongo_client.MongoClient` only when a new
        client is constructed (``client`` is ``None``).

    Returns
    -------
    EntieMongoClient or EntieDatabase
        Client wrapper, or database handle when ``database`` is not ``None``.

    Raises
    ------
    ValueError
        If neither a resolvable URI nor ``client`` is given; or if both ``uri``
        and ``client`` are passed.
    """
    if client is not None and uri is not None:
        raise ValueError("Pass only one of uri=... or client=..., not both.")
    if client is None:
        resolved = uri if uri is not None else os.environ.get(_DEFAULT_URI_ENV)
        if not resolved:
            raise ValueError("Provide uri=..., set ENTIE_URI, or pass client=... to connect().")
        client = MongoClient(resolved, **client_kwargs)

    wrapped = EntieMongoClient(client)
    if database is not None:
        return wrapped.database(database)
    return wrapped

lit(value)

Return value unchanged (placeholder for literal-friendly APIs).

Parameters:

Name Type Description Default
value Any

Any object.

required

Returns:

Type Description
Any

The input value.

Source code in packages/entie/src/entie/expressions/__init__.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def lit(value: Any) -> Any:
    """Return ``value`` unchanged (placeholder for literal-friendly APIs).

    Parameters
    ----------
    value:
        Any object.

    Returns
    -------
    Any
        The input ``value``.
    """
    return value

materialize_root_data(data)

Normalize pipeline data: columnarize :class:MongoRoot, else identity.

Parameters:

Name Type Description Default
data Any

Any value. If it is a :class:MongoRoot, returns the columnar dict from :func:mongo_root_to_column_dict; otherwise returns data unchanged.

required

Returns:

Type Description
Any

Columnar dict or the original data.

Source code in packages/entei-core/src/entei_core/_materialize.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def materialize_root_data(data: Any) -> Any:
    """Normalize pipeline data: columnarize :class:`MongoRoot`, else identity.

    Parameters
    ----------
    data:
        Any value. If it is a :class:`MongoRoot`, returns the columnar dict from
        :func:`mongo_root_to_column_dict`; otherwise returns ``data`` unchanged.

    Returns
    -------
    Any
        Columnar dict or the original ``data``.
    """
    if isinstance(data, MongoRoot):
        return mongo_root_to_column_dict(data)
    return data

mongo_root_to_column_dict(root)

Run find() on root.collection and build aligned column lists.

Reads the entire cursor into memory. Only top-level keys participate; nested documents are values in a single cell.

Parameters:

Name Type Description Default
root MongoRoot

Collection and optional fields (see :class:MongoRoot).

required

Returns:

Type Description
dict[str, list]

Keys are field names; each value is the column in document order.

Notes

When root.fields is None, keys are inferred from documents. When it is an empty tuple, returns {} for any document count.

Source code in packages/entei-core/src/entei_core/_materialize.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def mongo_root_to_column_dict(root: MongoRoot) -> dict[str, list[Any]]:
    """Run ``find()`` on ``root.collection`` and build aligned column lists.

    Reads the entire cursor into memory. Only top-level keys participate; nested
    documents are values in a single cell.

    Parameters
    ----------
    root:
        Collection and optional ``fields`` (see :class:`MongoRoot`).

    Returns
    -------
    dict[str, list]
        Keys are field names; each value is the column in document order.

    Notes
    -----
    When ``root.fields`` is ``None``, keys are inferred from documents. When it is
    an empty tuple, returns ``{}`` for any document count.
    """
    coll = root.collection
    cursor = coll.find()
    docs: list[dict[str, Any]] = list(cursor)
    if not docs:
        keys = list(root.fields) if root.fields is not None else []
        return {k: [] for k in keys}

    if root.fields is not None:
        keys = list(root.fields)
    else:
        key_set: set[str] = set()
        for d in docs:
            key_set.update(d.keys())
        keys = sorted(key_set)

    out: dict[str, list[Any]] = {k: [] for k in keys}
    for d in docs:
        for k in keys:
            out[k].append(d.get(k))
    return out