Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 64 additions & 9 deletions datafusion-pg-catalog/export_pg_catalog_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,13 @@ import json
import numpy as np

def pg_type_to_arrow_type(pg_type, is_nullable=True):
"""Map PostgreSQL types to Arrow types"""
"""Map PostgreSQL types to Arrow types

Every oid-alias type (oid, regproc, regtype, regclass, regnamespace, ...)
is stored as int32: the underlying object identifier. This lets the
`datafusion-pg-catalog` oid-coercion analyzer rule resolve name/numeric
string comparisons uniformly across all such columns.
"""
type_mapping = {
'bigint': pa.int64(),
'integer': pa.int32(),
Expand All @@ -173,10 +179,20 @@ def pg_type_to_arrow_type(pg_type, is_nullable=True):
'character varying': pa.string(),
'character': pa.string(),
'name': pa.string(),
'oid': pa.int32(), # OIDs are signed
'regproc': pa.string(),
'regtype': pa.string(),
'regclass': pa.string(),
# --- oid-alias family: all stored as int32 (the raw oid) ---
'oid': pa.int32(),
'regproc': pa.int32(),
'regprocedure': pa.int32(),
'regoper': pa.int32(),
'regoperator': pa.int32(),
'regclass': pa.int32(),
'regtype': pa.int32(),
'regnamespace': pa.int32(),
'regrole': pa.int32(),
'regconfig': pa.int32(),
'regdictionary': pa.int32(),
'regcollation': pa.int32(),
# --- end oid-alias family ---
'timestamp': pa.timestamp('us'),
'timestamp without time zone': pa.timestamp('us'),
'timestamp with time zone': pa.timestamp('us', tz='UTC'),
Expand Down Expand Up @@ -207,6 +223,30 @@ def pg_type_to_arrow_type(pg_type, is_nullable=True):
arrow_type = type_mapping.get(pg_type, pa.string())
return arrow_type

# PostgreSQL types that are object-identifier aliases. For these we stamp Arrow
# field metadata `pg.oid_alias=<pg_type>` so the catalog's oid-coercion analyzer
# rule can resolve name/numeric string comparisons the way Postgres does.
OID_ALIAS_TYPES = {
'oid',
'regproc', 'regprocedure',
'regoper', 'regoperator',
'regclass', 'regtype',
'regnamespace', 'regrole',
'regconfig', 'regdictionary', 'regcollation',
}

# reg* aliases need an explicit `::oid` cast at SELECT time because Postgres
# returns their display (name) form, not the integer, on a plain SELECT.
REG_OID_TYPES = OID_ALIAS_TYPES - {'oid'}


def select_expr(col_name, data_type):
"""Build a SELECT column expression, casting reg* columns to oid."""
quoted = '"' + col_name.replace('"', '""') + '"'
if data_type in REG_OID_TYPES:
return f'{quoted}::oid AS {quoted}'
return quoted

def convert_value(value, arrow_type):
"""Convert PostgreSQL value to Arrow-compatible value"""
if value is None:
Expand Down Expand Up @@ -325,7 +365,9 @@ def export_table_to_arrow(conn, schema_name, table_name, output_dir):
print(f"{'Column Name':<30} {'Type':<20} {'Nullable':<10} {'Description':<40}")
print("-" * 100)

# Build Arrow schema
# Build Arrow schema. For oid-alias columns we attach field metadata
# so the Rust catalog's oid-coercion analyzer rule can resolve string
# comparisons against them.
arrow_fields = []
for col_name, data_type, is_nullable, default, comment in columns:
nullable = "YES" if is_nullable == 'YES' else "NO"
Expand All @@ -334,13 +376,26 @@ def export_table_to_arrow(conn, schema_name, table_name, output_dir):

# Create Arrow field
arrow_type = pg_type_to_arrow_type(data_type, is_nullable == 'YES')
field = pa.field(col_name, arrow_type, nullable=(is_nullable == 'YES'))
metadata = {}
if data_type in OID_ALIAS_TYPES:
metadata['pg.oid_alias'] = data_type
print(f"{'':>62} -> annotated pg.oid_alias={data_type}")
field = pa.field(
col_name, arrow_type,
nullable=(is_nullable == 'YES'),
metadata=metadata,
)
arrow_fields.append(field)

arrow_schema = pa.schema(arrow_fields)

# Fetch all data
cur.execute(f"SELECT * FROM {schema_name}.{table_name}")
# Fetch all data. reg* oid-alias columns are cast to oid at SELECT time
# so they are returned as integers (matching their stored Arrow type).
select_cols = ", ".join(
select_expr(name, dtype)
for (name, dtype, _, _, _) in columns
)
cur.execute(f'SELECT {select_cols} FROM {schema_name}."{table_name}"')

# Process data in batches to handle large tables
batch_size = 10000
Expand Down
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_am.feather
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_amop.feather
Binary file not shown.
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_attrdef.feather
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_attribute.feather
Binary file not shown.
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_authid.feather
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_cast.feather
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_class.feather
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_collation.feather
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_database.feather
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_depend.feather
Binary file not shown.
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_enum.feather
Binary file not shown.
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_extension.feather
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_index.feather
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_inherits.feather
Binary file not shown.
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_language.feather
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_namespace.feather
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_opclass.feather
Binary file not shown.
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_opfamily.feather
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_policy.feather
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_proc.feather
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_range.feather
Binary file not shown.
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_rewrite.feather
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_seclabel.feather
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_sequence.feather
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_shdepend.feather
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_transform.feather
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_trigger.feather
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_ts_config.feather
Binary file not shown.
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_ts_dict.feather
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_ts_parser.feather
Binary file not shown.
Binary file not shown.
Binary file modified datafusion-pg-catalog/pg_catalog_arrow_exports/pg_type.feather
Binary file not shown.
Binary file not shown.
48 changes: 45 additions & 3 deletions datafusion-pg-catalog/pg_to_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@
import sys

def map_postgresql_to_arrow_type(type_oid: int) -> pa.DataType:
"""Map PostgreSQL data types to Arrow data types."""
"""Map PostgreSQL data types to Arrow data types.

Every oid-alias type (oid, regproc, regtype, regclass, regnamespace, ...)
is stored as int32 (the raw object identifier) so the
`datafusion-pg-catalog` oid-coercion analyzer rule can resolve name/numeric
string comparisons uniformly.
"""
# Map OIDs to Arrow types
type_mapping = {
# Integer types (OIDs from PostgreSQL documentation)
Expand Down Expand Up @@ -61,6 +67,31 @@ def map_postgresql_to_arrow_type(type_oid: int) -> pa.DataType:

return type_mapping.get(type_oid, pa.string()) # Fallback to string

# PostgreSQL `reg*` type names (and `oid`). Their OIDs are version-stable but
# kept here as (oid, name) pairs so we can both map them to int32 and stamp
# `pg.oid_alias=<name>` field metadata for the catalog analyzer rule.
OID_ALIAS_TYPES_BY_NAME = {
'oid',
'regproc', 'regprocedure',
'regoper', 'regoperator',
'regclass', 'regtype',
'regnamespace', 'regrole',
'regconfig', 'regdictionary', 'regcollation',
}


def load_type_names(conn) -> dict:
"""Build a {type_oid: typname} map from pg_type for oid-alias detection."""
cur = conn.cursor()
cur.execute("SELECT oid, typname FROM pg_catalog.pg_type")
mapping = {row[0]: row[1] for row in cur.fetchall()}
cur.close()
return mapping


def is_oid_alias(type_name: str) -> bool:
return type_name in OID_ALIAS_TYPES_BY_NAME

def export_query_to_feather(
connection_string: str,
query: str,
Expand All @@ -74,23 +105,30 @@ def export_query_to_feather(
conn = psycopg2.connect(connection_string)
cursor = conn.cursor(cursor_factory=RealDictCursor)

# type_oid -> typname, used to detect oid-alias columns for metadata.
type_names = load_type_names(conn)

# Execute query
cursor.execute(query)

# Get column information
columns = []
arrow_types = []
column_names = []
column_oid_kinds = [] # pg.oid_alias value per column, or None

for desc in cursor.description:
col_name = desc.name
col_oid = desc.type_code

arrow_type = map_postgresql_to_arrow_type(col_oid)
type_name = type_names.get(col_oid, '')
oid_kind = type_name if is_oid_alias(type_name) else None

columns.append(col_name)
arrow_types.append(arrow_type)
column_names.append(col_name)
column_oid_kinds.append(oid_kind)

# Process data in batches
all_data = {col: [] for col in columns}
Expand Down Expand Up @@ -122,8 +160,12 @@ def export_query_to_feather(
array = pa.array([str(x) if x is not None else None for x in all_data[col]], type=pa.string())
arrays.append(array)

# Create table and write to feather
table = pa.Table.from_arrays(arrays, names=column_names)
# Build schema with pg.oid_alias metadata on oid-alias columns.
fields = []
for name, arrow_type, oid_kind in zip(column_names, arrow_types, column_oid_kinds):
metadata = {b'pg.oid_alias': oid_kind.encode()} if oid_kind else {}
fields.append(pa.field(name, arrow_type, metadata=metadata))
table = pa.Table.from_arrays(arrays, schema=pa.schema(fields))
feather.write_feather(table, output_file)

print(f"Successfully exported {rows_processed} rows to {output_file}")
Expand Down
28 changes: 25 additions & 3 deletions datafusion-pg-catalog/src/pg_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub mod context;
pub mod empty_table;
pub mod format_type;
pub mod has_privilege_udf;
pub mod oid_coercion_rule;
pub mod oid_field;
pub mod pg_attribute;
pub mod pg_class;
pub mod pg_database;
Expand Down Expand Up @@ -228,6 +230,19 @@ impl<C: CatalogInfo, P: PgCatalogContextProvider> SchemaProvider for PgCatalogSc
}
}

/// Backs the oid-coercion analyzer rule: build a pg_catalog table provider
/// synchronously so the rule can construct name->oid lookup subqueries at
/// plan time (data is fetched lazily at execution).
impl<C: CatalogInfo, P: PgCatalogContextProvider> oid_coercion_rule::OidLookupProvider
for PgCatalogSchemaProvider<C, P>
{
fn build_table_provider(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
self.build_table_by_name(name)?
.map(|t| t.try_into_table_provider())
.transpose()
}
}

impl<C: CatalogInfo, P: PgCatalogContextProvider> PgCatalogSchemaProvider<C, P> {
pub fn try_new(
catalog_list: C,
Expand Down Expand Up @@ -1436,19 +1451,19 @@ where
P: PgCatalogContextProvider,
{
let static_tables = Arc::new(PgCatalogStaticTables::try_new()?);
let pg_catalog = PgCatalogSchemaProvider::try_new(
let pg_catalog = Arc::new(PgCatalogSchemaProvider::try_new(
session_context.state().catalog_list().clone(),
static_tables.clone(),
context_provider,
)?;
)?);
session_context
.catalog(catalog_name)
.ok_or_else(|| {
DataFusionError::Configuration(format!(
"Catalog not found when registering pg_catalog: {catalog_name}"
))
})?
.register_schema("pg_catalog", Arc::new(pg_catalog))?;
.register_schema("pg_catalog", pg_catalog.clone())?;

session_context.register_udf(create_current_database_udf());
session_context.register_udf(create_current_schema_udf());
Expand Down Expand Up @@ -1485,6 +1500,13 @@ where
session_context.register_udf(quote_ident_udf::create_quote_ident_udf());
session_context.register_udf(quote_ident_udf::create_parse_ident_udf());

// Make oid / oid-alias columns (relnamespace, atttypid, ...) compare
// against string literals the way Postgres does. Requires the schema
// provider handle (kept above) to resolve names -> oids at plan time.
session_context.add_analyzer_rule(Arc::new(oid_coercion_rule::OidStringCoercion::new(
pg_catalog.clone() as Arc<dyn oid_coercion_rule::OidLookupProvider>,
)));

Ok(())
}

Expand Down
Loading
Loading