update parquet schema

Signed-off-by: zyphlar <zyphlar@gmail.com>
This commit is contained in:
zyphlar
2026-01-03 23:38:14 -08:00
parent 4d862b0a8b
commit e9406c0f36
2 changed files with 13 additions and 5 deletions

View File

@@ -133,19 +133,26 @@ def process_parquet_streaming(parquet_url: str, output_dir: Path, batch_size: in
logger.info(f"Reading parquet file: {parquet_url}")
# First, inspect the schema to understand the columns
try:
schema_result = conn.execute(f"DESCRIBE SELECT * FROM read_parquet('{parquet_url}') LIMIT 0").fetchall()
logger.info(f"Parquet schema: {[col[0] for col in schema_result]}")
except Exception as e:
logger.warning(f"Could not read schema: {e}")
# Dictionary to accumulate points per country
country_points: Dict[str, List[Tuple[float, float, str]]] = defaultdict(list)
# Stream the parquet file in batches
# Assuming parquet has columns: latitude, longitude, id (or similar)
# Adjust column names based on actual Panoramax parquet schema
# Geoparquet stores geometry as GEOMETRY type
# Use DuckDB spatial functions to extract lat/lon
query = f"""
SELECT
latitude as lat,
longitude as lon,
ST_Y(geometry) as lat,
ST_X(geometry) as lon,
id as image_id
FROM read_parquet('{parquet_url}')
WHERE latitude IS NOT NULL AND longitude IS NOT NULL
WHERE geometry IS NOT NULL
"""
try: