mirror of
https://codeberg.org/comaps/comaps
synced 2026-01-14 08:04:23 +00:00
@@ -33,43 +33,169 @@ except ImportError:
|
||||
print("Error: duckdb is required. Install with: pip install duckdb", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
from shapely.geometry import Point, Polygon, MultiPolygon
|
||||
from shapely.strtree import STRtree
|
||||
except ImportError:
|
||||
print("Error: shapely is required. Install with: pip install shapely", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def load_country_polygons(polygons_file: Path) -> Dict[str, any]:
|
||||
def parse_poly_file(poly_path: Path) -> MultiPolygon:
|
||||
"""
|
||||
Load country polygons from packed_polygons.bin file.
|
||||
Parse an Osmosis .poly file and return a Shapely MultiPolygon.
|
||||
|
||||
This is a placeholder - actual implementation would need to parse the binary format.
|
||||
For now, we'll use a simpler approach with DuckDB spatial functions.
|
||||
.poly format:
|
||||
Line 1: Region name
|
||||
Section N: (numbered 1, 2, 3...)
|
||||
lon lat (pairs of coordinates)
|
||||
...
|
||||
END
|
||||
"""
|
||||
# TODO: Implement actual polygon loading from packed_polygons.bin
|
||||
# For MVP, we can use a simplified approach or require pre-processed country boundaries
|
||||
logger.warning("Country polygon loading not yet implemented - using fallback method")
|
||||
return {}
|
||||
with open(poly_path, 'r', encoding='utf-8') as f:
|
||||
lines = f.readlines()
|
||||
|
||||
polygons = []
|
||||
current_coords = []
|
||||
in_section = False
|
||||
|
||||
def determine_country_from_coords(lat: float, lon: float, conn: duckdb.DuckDBPyConnection) -> str:
|
||||
"""
|
||||
Determine which country a coordinate belongs to.
|
||||
for line in lines[1:]: # Skip first line (region name)
|
||||
line = line.strip()
|
||||
|
||||
This uses a simple approach for MVP - can be enhanced later.
|
||||
Returns country name or "Unknown" if not found.
|
||||
"""
|
||||
# Simplified country detection for MVP
|
||||
# TODO: Use actual country polygons for accurate spatial join
|
||||
if not line:
|
||||
continue
|
||||
|
||||
# For now, return a simplified country code based on rough lat/lon bounds
|
||||
# This is just for initial testing - real implementation needs proper spatial join
|
||||
if 40 < lat < 52 and -5 < lon < 10:
|
||||
return "France"
|
||||
elif 45 < lat < 48 and 5 < lon < 11:
|
||||
return "Switzerland"
|
||||
elif 43 < lat < 44 and 7 < lon < 8:
|
||||
return "Monaco"
|
||||
if line.upper() == 'END':
|
||||
if current_coords:
|
||||
# Close the polygon if needed
|
||||
if current_coords[0] != current_coords[-1]:
|
||||
current_coords.append(current_coords[0])
|
||||
|
||||
# Create polygon (need at least 3 points + closing point)
|
||||
if len(current_coords) >= 4:
|
||||
try:
|
||||
poly = Polygon(current_coords)
|
||||
|
||||
# If polygon is invalid, try to fix it
|
||||
if not poly.is_valid:
|
||||
# Try buffer(0) trick to fix self-intersections
|
||||
poly = poly.buffer(0)
|
||||
|
||||
# Only accept if it's now valid and is a Polygon or MultiPolygon
|
||||
if poly.is_valid and not poly.is_empty:
|
||||
if poly.geom_type == 'Polygon':
|
||||
polygons.append(poly)
|
||||
elif poly.geom_type == 'MultiPolygon':
|
||||
# Split multipolygon into individual polygons
|
||||
polygons.extend(poly.geoms)
|
||||
else:
|
||||
logger.debug(f"Skipping invalid section in {poly_path.name}")
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error creating polygon in {poly_path.name}: {e}")
|
||||
|
||||
current_coords = []
|
||||
in_section = False
|
||||
continue
|
||||
|
||||
# Try to parse as section number
|
||||
try:
|
||||
int(line)
|
||||
in_section = True
|
||||
continue
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Parse coordinate pair
|
||||
if in_section:
|
||||
parts = line.split()
|
||||
if len(parts) >= 2:
|
||||
try:
|
||||
lon = float(parts[0])
|
||||
lat = float(parts[1])
|
||||
current_coords.append((lon, lat))
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if not polygons:
|
||||
logger.warning(f"No valid polygons found in {poly_path.name}")
|
||||
return None
|
||||
|
||||
if len(polygons) == 1:
|
||||
return MultiPolygon([polygons[0]])
|
||||
else:
|
||||
return "Unknown"
|
||||
return MultiPolygon(polygons)
|
||||
|
||||
|
||||
def load_country_polygons(borders_dir: Path) -> Dict[str, MultiPolygon]:
|
||||
"""
|
||||
Load all .poly files from the borders directory.
|
||||
|
||||
Returns a dict mapping region name (without .poly extension) to MultiPolygon.
|
||||
"""
|
||||
logger.info(f"Loading .poly files from {borders_dir}")
|
||||
|
||||
poly_files = list(borders_dir.glob("*.poly"))
|
||||
logger.info(f"Found {len(poly_files)} .poly files")
|
||||
|
||||
polygons = {}
|
||||
|
||||
for poly_file in poly_files:
|
||||
region_name = poly_file.stem # Filename without .poly extension
|
||||
|
||||
try:
|
||||
multi_polygon = parse_poly_file(poly_file)
|
||||
if multi_polygon:
|
||||
polygons[region_name] = multi_polygon
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing {poly_file.name}: {e}")
|
||||
continue
|
||||
|
||||
logger.info(f"Successfully loaded {len(polygons)} region polygons")
|
||||
return polygons
|
||||
|
||||
|
||||
class RegionFinder:
|
||||
"""
|
||||
Efficient spatial index for finding which region a point belongs to.
|
||||
Uses Shapely's STRtree for fast spatial queries.
|
||||
"""
|
||||
def __init__(self, regions: Dict[str, MultiPolygon]):
|
||||
logger.info("Building spatial index for region lookup...")
|
||||
|
||||
self.regions = regions
|
||||
self.region_names = []
|
||||
self.geometries = []
|
||||
|
||||
for region_name, multi_polygon in regions.items():
|
||||
self.region_names.append(region_name)
|
||||
self.geometries.append(multi_polygon)
|
||||
|
||||
# Build R-tree spatial index for fast lookups
|
||||
self.tree = STRtree(self.geometries)
|
||||
|
||||
logger.info(f"Spatial index built with {len(self.geometries)} regions")
|
||||
|
||||
def find_region(self, lat: float, lon: float) -> str:
|
||||
"""
|
||||
Find which region a coordinate belongs to.
|
||||
|
||||
Returns region name or None if not found.
|
||||
"""
|
||||
point = Point(lon, lat) # Note: Shapely uses (x, y) = (lon, lat)
|
||||
|
||||
# Query the spatial index for candidate polygons
|
||||
candidates = self.tree.query(point)
|
||||
|
||||
# Check each candidate to see if point is actually inside
|
||||
for idx in candidates:
|
||||
if self.geometries[idx].contains(point):
|
||||
return self.region_names[idx]
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def write_binary_file(output_path: Path, points: List[Tuple[float, float, str]]):
|
||||
@@ -109,12 +235,21 @@ def write_binary_file(output_path: Path, points: List[Tuple[float, float, str]])
|
||||
logger.info(f"Wrote {point_count} points to {output_path}")
|
||||
|
||||
|
||||
def process_parquet_streaming(parquet_url: str, output_dir: Path, batch_size: int = 100000):
|
||||
def process_parquet_streaming(parquet_url: str, output_dir: Path, borders_dir: Path, batch_size: int = 100000):
|
||||
"""
|
||||
Stream the Panoramax parquet file and write per-country binary files.
|
||||
|
||||
Uses DuckDB to stream the large parquet file without loading it entirely into memory.
|
||||
Uses .poly files from borders_dir to categorize points into regions.
|
||||
"""
|
||||
# Load region polygons and build spatial index
|
||||
regions = load_country_polygons(borders_dir)
|
||||
if not regions:
|
||||
logger.error("No regions loaded - cannot process panoramax data")
|
||||
return
|
||||
|
||||
region_finder = RegionFinder(regions)
|
||||
|
||||
conn = duckdb.connect(database=':memory:')
|
||||
|
||||
# Enable httpfs extension for remote file access
|
||||
@@ -175,12 +310,12 @@ def process_parquet_streaming(parquet_url: str, output_dir: Path, batch_size: in
|
||||
for row in batch:
|
||||
lat, lon, image_id = row
|
||||
|
||||
# Determine country
|
||||
country = determine_country_from_coords(lat, lon, conn)
|
||||
# Find which region this point belongs to
|
||||
region = region_finder.find_region(lat, lon)
|
||||
|
||||
# Skip unknown countries for now (or save to separate file)
|
||||
if country != "Unknown":
|
||||
country_points[country].append((lat, lon, str(image_id)))
|
||||
# Only add points that fall within a defined region
|
||||
if region:
|
||||
country_points[region].append((lat, lon, str(image_id)))
|
||||
|
||||
# Periodically write to disk to avoid memory issues
|
||||
if batch_count % 10 == 0:
|
||||
@@ -229,9 +364,10 @@ def main():
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--polygons',
|
||||
'--borders-dir',
|
||||
type=Path,
|
||||
help='Path to packed_polygons.bin file (optional, for accurate country detection)'
|
||||
default=Path(__file__).parent.parent.parent.parent / 'data' / 'borders',
|
||||
help='Path to directory containing .poly border files (default: <repo>/data/borders)'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
@@ -246,19 +382,19 @@ def main():
|
||||
logger.info("Panoramax Preprocessor starting")
|
||||
logger.info(f"Input: {args.input}")
|
||||
logger.info(f"Output directory: {args.output}")
|
||||
logger.info(f"Borders directory: {args.borders_dir}")
|
||||
logger.info(f"Batch size: {args.batch_size}")
|
||||
|
||||
if args.polygons:
|
||||
logger.info(f"Country polygons: {args.polygons}")
|
||||
# TODO: Load and use country polygons for accurate spatial join
|
||||
else:
|
||||
logger.warning("No country polygons provided - using simplified country detection")
|
||||
# Verify borders directory exists
|
||||
if not args.borders_dir.exists():
|
||||
logger.error(f"Borders directory not found: {args.borders_dir}")
|
||||
sys.exit(1)
|
||||
|
||||
# Create output directory
|
||||
args.output.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Process the parquet file
|
||||
process_parquet_streaming(args.input, args.output, args.batch_size)
|
||||
process_parquet_streaming(args.input, args.output, args.borders_dir, args.batch_size)
|
||||
|
||||
logger.info("Panoramax preprocessing complete!")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user