Skip to content

Cross-backend table creation and insertion #8115

@NickCrews

Description

@NickCrews

Is your feature request related to a problem?

Inspired by #8110

I use duckdb for local analysis and processing and a hosted postgres to house our production data. I sometimes want to move data between them.

It would be great if I could do

duckdb = ibis.duckdb.connect()
sqlite = ibis.sqlite.connect()
pg = ibis.postgres.connect(URI)

# Currently I have something monkeypatched into the PG backend so I can do
pg.create_table("new_table", duckdb.table("t"))

# It would be neat if this even worked transitively (with duckdb in the middle, hidden)
pg.create_table("new_table", sqlite.table("t"))

# The full DDL APIs would be ideal:
pg.create_view("new_table", duckdb.table("t"))

Describe the solution you'd like

Current implementation (not quite runnable), I can make it so if you want

Details
"""Utils for interacting with PostgreSQL databases."""
from dataclasses import dataclass
import re

import ibis
from ibis.backends.postgres import Backend as PGBackend
from ibis.expr.types import Table
from noatak.ibis import get_DuckDBPyConnection

from atlas.env import read_env


def connect(uri: str | None = None) -> PGBackend:
    """Connect to a PostgreSQL database.

    This is a wrapper around ibis.postgres.connect() that adds support for
    importing duckdb tables into PostgreSQL.
    """
    if uri is None:
        uri = read_env()["PG_URI"]
    conn_info = _PGConnInfo.from_uri(uri)
    backend = ibis.connect(conn_info.to_uri())

    def create_table(
        self, name, obj=None, *, schema=None, database=None, temp=False, overwrite=False
    ) -> None:
        if (
            isinstance(obj, Table)
            and obj._find_backend(use_default=True).name == "duckdb"
        ):
            if schema is not None:
                raise ValueError("Cannot specify schema for duckdb table")
            if database is not None:
                raise ValueError("Cannot specify database for duckdb table")
            return _import_duckdb_table(
                table=obj,
                pg_connection_info=conn_info,
                new_name=name,
                temp=temp,
                overwrite=overwrite,
            )
        else:
            return super(PGBackend, self).create_table(
                name, obj=obj, schema=schema, database=database, temp=temp
            )

    create_table.__doc__ = PGBackend.create_table.__doc__
    backend.create_table = create_table.__get__(backend)
    return backend


@dataclass
class _PGConnInfo:
    dbname: str
    host: str
    user: str
    password: str

    @classmethod
    def from_uri(cls, uri: str):
        """Parse a connection string into a PGConnInfo object.

        eg
        postgresql://NickCrews:PASSWORD@ep-white-base-51487250.us-west-2.aws.neon.tech/scg-db?sslmode=require
        """
        uri = uri.removesuffix("?sslmode=require")
        pattern = r"postgresql://(?P<user>[^:]+):(?P<password>[^@]+)@(?P<host>[^/]+)/(?P<dbname>[^?]+)"
        match = re.match(pattern, uri)
        if match is None:
            raise ValueError(f"Could not parse connection string: {uri}")
        return cls(**match.groupdict())

    def to_uri(self) -> str:
        return f"postgresql://{self.user}:{self.password}@{self.host}/{self.dbname}"


def _import_duckdb_table(
    *,
    table: Table,
    pg_connection_info: str | _PGConnInfo,
    new_name: str,
    temp: bool = False,
    overwrite: bool = False,
) -> None:
    """Create a table in PostgreSQL from an (duckdb-based) Ibis table."""
    if not isinstance(pg_connection_info, _PGConnInfo):
        pg_connection_info = _PGConnInfo.from_uri(pg_connection_info)
    # we need to cache the table so that:
    # 1. the table name is qualified with main.ibis_cache_rwe632 (or whatever)
    #    so that we can load it into the pgdb database
    # 2. In case it is a memtable, so that it is actually materialized into
    #    a backend, so that ._find_backend() doesn't barf.
    table = table.cache()
    backend = table._find_backend()
    ddb_conn = get_DuckDBPyConnection(backend)
    i = pg_connection_info
    databases_present = [
        row[0] for row in ddb_conn.execute("SHOW DATABASES").fetchall()
    ]
    if "pgdb" not in databases_present:
        attach_sql = f"ATTACH 'dbname={i.dbname} host={i.host} user={i.user} password={i.password}' AS pgdb (TYPE postgres);"  # noqa
        ddb_conn.sql(attach_sql)
    src_sql = str(ibis.to_sql(table, dialect="duckdb"))
    temp_str = "TEMPORARY " if temp else " "
    overwrite_str = "IF NOT EXISTS" if overwrite else " "
    ddl_sql = f"CREATE{temp_str}TABLE{overwrite_str}pgdb.{new_name} AS {src_sql}"
    ddb_conn.sql(ddl_sql)

What version of ibis are you running?

main

What backend(s) are you using, if any?

No response

Code of Conduct

  • I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    featureFeatures or general enhancements

    Type

    No type
    No fields configured for issues without a type.

    Projects

    Status
    todo

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions