Overpass API Integration
Status: ✅ Vollständig dokumentiert
Übersicht
Die Overpass API Integration in p2d2 ermöglicht den Zugriff auf OpenStreetMap-Daten für administrative Grenzen und Friedhöfe. Die Implementierung bietet Load-Balancing über mehrere Endpoints, robuste Fehlerbehandlung und effiziente Query-Optimierung.
Architektur
Python Overpass Client
python
class OverpassDownloader:
"""Load-balanced Overpass API client with retry logic"""
ENDPOINTS = [
"https://overpass-api.de/api/interpreter",
"https://overpass.kumi.systems/api/interpreter",
"https://overpass.openstreetmap.fr/api/interpreter",
"https://overpass.openstreetmap.ru/api/interpreter",
"https://maps.mail.ru/osm/tools/overpass/api/interpreter",
"https://overpass.private.coffee/api/interpreter",
"https://z.overpass-api.de/api/interpreter",
]
def __init__(self, timeout=180):
self.timeout = timeout
self.current_endpoint = 0Query Builder
Administrative Grenzen
python
def build_query(self, municipality_name: str, admin_level: int) -> str:
"""Build area-based Overpass query for administrative boundaries"""
safe_name = municipality_name.replace('"', '\\"')
return f'''[out:json][timeout:{self.timeout}][maxsize:1073741824];
relation["boundary"="administrative"]["admin_level"={admin_level}]["name"="{safe_name}"];
out geom;'''Friedhöfe
python
def build_cemetery_query(self, municipality_name: str) -> str:
"""Build cemetery query with area filtering"""
return f"""
[out:json][timeout:{self.timeout}][maxsize:1073741824];
(
area[name="{municipality_name}"][boundary=administrative] -> .area0;
way[landuse=cemetery](area.area0);
relation[landuse=cemetery][type=multipolygon](area.area0);
);
out geom;
"""Verwendung
Python Script Integration
python
from overpass_downloader import OverpassDownloader
# Client initialisieren
downloader = OverpassDownloader(timeout=180)
# Administrative Grenzen laden
def fetch_admin_polygons(kommune: str, levels: List[int]) -> Dict:
results = {}
for level in levels:
try:
data = downloader.download_admin_level(kommune, level)
results[level] = data
print(f"Level {level}: {len(data.get('elements', []))} Elemente")
except Exception as e:
print(f"Fehler bei Level {level}: {e}")
return results
# Friedhöfe laden
def fetch_cemeteries(kommune: str) -> Dict:
try:
data = downloader.download_cemeteries(kommune)
cemetery_count = len(data.get('elements', []))
print(f"Friedhöfe gefunden: {cemetery_count}")
return data
except Exception as e:
print(f"Fehler beim Laden der Friedhöfe: {e}")
return {}TypeScript Bridge
typescript
// Python Script Aufruf aus TypeScript
async function fetchAdminPolygons(
kommune: string,
level: number
): Promise<any> {
return new Promise((resolve, reject) => {
const args = [
"src/scripts/fetch_admin_polygons.py",
"--kommune", kommune,
"--levels", level.toString(),
"--debug"
];
const pythonProcess = spawn("python", args);
let output = "";
let stderr = "";
pythonProcess.stdout.on("data", (data) => (output += data.toString()));
pythonProcess.stderr.on("data", (data) => (stderr += data.toString()));
pythonProcess.on("close", (code) => {
if (code === 0) {
try {
const jsonMatch = output.match(/\{[\s\S]*\}/);
if (!jsonMatch) {
reject(new Error("No JSON found in Python script output"));
return;
}
const result = JSON.parse(jsonMatch[0]);
resolve(result);
} catch (e) {
reject(new Error("Invalid JSON from Python script"));
}
} else {
reject(new Error(`Python script failed with code ${code}: ${stderr}`));
}
});
});
}Load Balancing & Retry Logic
Intelligente Endpoint-Rotation
python
def query_overpass(self, query: str) -> Dict:
"""Execute Overpass query with load balancing and retry logic"""
for attempt in range(3):
endpoint = self.ENDPOINTS[self.current_endpoint]
self.current_endpoint = (self.current_endpoint + 1) % len(self.ENDPOINTS)
try:
logger.info(f"Attempt {attempt + 1}/3 using endpoint: {endpoint}")
response = requests.post(
endpoint,
data=query,
headers={"Content-Type": "text/plain"},
timeout=self.timeout,
)
if response.status_code == 200:
data = response.json()
element_count = len(data.get("elements", []))
logger.info(f"Successfully fetched {element_count} elements")
return data
else:
logger.warning(f"HTTP {response.status_code} from {endpoint}")
except requests.exceptions.Timeout:
logger.warning(f"Timeout from {endpoint}")
except requests.exceptions.ConnectionError:
logger.warning(f"Connection error from {endpoint}")
except Exception as e:
logger.warning(f"Request failed: {e}")
if attempt < 2: # Don't sleep on last attempt
time.sleep(2**attempt) # Exponential backoff
raise Exception("All Overpass endpoints failed")Query Optimierungen
Effiziente Area-Queries
python
def build_optimized_area_query(municipality_name: str, admin_level: int) -> str:
"""Optimierte Query mit Area-Referenzierung"""
safe_name = municipality_name.replace('"', '\\"')
return f"""
[out:json][timeout:180];
area["name"="{safe_name}"][boundary=administrative] -> .searchArea;
(
relation["boundary"="administrative"]["admin_level"={admin_level}](area.searchArea);
way["boundary"="administrative"]["admin_level"={admin_level}](area.searchArea);
);
out geom;
"""Batch Processing für mehrere Levels
python
def build_multi_level_query(municipality_name: str, levels: List[int]) -> str:
"""Query für mehrere Admin-Levels in einem Request"""
safe_name = municipality_name.replace('"', '\\"')
level_conditions = "".join(
f' relation["boundary"="administrative"]["admin_level"={level}]["name"="{safe_name}"];\n'
for level in levels
)
return f"""
[out:json][timeout:300][maxsize:2147483648];
(
{level_conditions}
);
out geom;
"""Error Handling
Robuste Fehlerbehandlung
python
class ResilientOverpassDownloader(OverpassDownloader):
"""Erweiterter Downloader mit verbessertem Error-Handling"""
def download_with_fallback(self, municipality_name: str, admin_level: int) -> Dict:
"""Download mit Fallback-Strategien"""
try:
# Primär: Spezifische Admin-Level Query
return self.download_admin_level(municipality_name, admin_level)
except Exception as primary_error:
logger.warning(f"Primary query failed, trying fallback: {primary_error}")
try:
# Fallback 1: Breitere Suche ohne Admin-Level Filter
return self.download_without_level_filter(municipality_name)
except Exception as fallback_error:
logger.error(f"All fallbacks failed: {fallback_error}")
# Fallback 2: Leeres Result zurückgeben
return {"elements": [], "remark": "fallback_empty_result"}
def download_without_level_filter(self, municipality_name: str) -> Dict:
"""Download ohne Admin-Level Filter für größere Trefferquote"""
query = f"""
[out:json][timeout:180];
relation["boundary"="administrative"]["name"="{municipality_name}"];
out geom;
"""
return self.query_overpass(query)Rate Limiting
python
class RateLimitedOverpassDownloader(OverpassDownloader):
"""Downloader mit Rate-Limiting"""
def __init__(self, timeout=180, requests_per_minute=30):
super().__init__(timeout)
self.requests_per_minute = requests_per_minute
self.request_times = []
def query_overpass(self, query: str) -> Dict:
"""Rate-limited query execution"""
self._enforce_rate_limit()
self.request_times.append(time.time())
return super().query_overpass(query)
def _enforce_rate_limit(self):
"""Enforce requests per minute limit"""
now = time.time()
one_minute_ago = now - 60
# Alte Requests entfernen
self.request_times = [t for t in self.request_times if t > one_minute_ago]
# Prüfe ob Limit erreicht
if len(self.request_times) >= self.requests_per_minute:
sleep_time = 60 - (now - self.request_times[0])
if sleep_time > 0:
logger.info(f"Rate limit reached, sleeping for {sleep_time:.1f}s")
time.sleep(sleep_time)Performance Monitoring
Query Performance Tracking
python
class MonitoredOverpassDownloader(OverpassDownloader):
"""Downloader mit Performance-Monitoring"""
def __init__(self, timeout=180):
super().__init__(timeout)
self.metrics = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'average_response_time': 0,
'endpoint_usage': {}
}
def query_overpass(self, query: str) -> Dict:
"""Track performance metrics for each query"""
start_time = time.time()
self.metrics['total_requests'] += 1
try:
result = super().query_overpass(query)
duration = time.time() - start_time
self.metrics['successful_requests'] += 1
self.metrics['average_response_time'] = self._update_average_time(duration)
# Track endpoint usage
endpoint = self.ENDPOINTS[self.current_endpoint]
self.metrics['endpoint_usage'][endpoint] = \
self.metrics['endpoint_usage'].get(endpoint, 0) + 1
logger.info(f"Query completed in {duration:.2f}s")
return result
except Exception as e:
self.metrics['failed_requests'] += 1
logger.error(f"Query failed after {time.time() - start_time:.2f}s: {e}")
raise
def _update_average_time(self, new_time: float) -> float:
"""Update running average of response times"""
total_requests = self.metrics['successful_requests']
current_avg = self.metrics['average_response_time']
if total_requests == 1:
return new_time
else:
return (current_avg * (total_requests - 1) + new_time) / total_requests
def get_metrics(self) -> Dict:
"""Get current performance metrics"""
success_rate = (self.metrics['successful_requests'] / self.metrics['total_requests'] * 100) \
if self.metrics['total_requests'] > 0 else 0
return {
**self.metrics,
'success_rate': f"{success_rate:.1f}%",
'total_endpoints_used': len(self.metrics['endpoint_usage'])
}Best Practices
Query Optimization
python
# ✅ Korrekt - Spezifische, effiziente Queries
def build_efficient_query(municipality: str, level: int) -> str:
return f'''
[out:json][timeout:120];
relation["boundary"="administrative"]["admin_level"={level}]["name"="{municipality}"];
out geom;
'''
# ❌ Vermeiden - Zu allgemeine Queries
def build_inefficient_query(municipality: str) -> str:
return f'''
[out:json];
relation["name"="{municipality}"];
out geom;
''' # Keine Filter, kann sehr langsam seinError Handling
python
# ✅ Korrekt - Umfassende Fehlerbehandlung
def safe_download(municipality: str, level: int) -> Dict:
try:
return downloader.download_admin_level(municipality, level)
except requests.exceptions.Timeout:
logger.error(f"Timeout für {municipality} Level {level}")
return {"elements": [], "error": "timeout"}
except requests.exceptions.ConnectionError:
logger.error(f"Verbindungsfehler für {municipality} Level {level}")
return {"elements": [], "error": "connection"}
except Exception as e:
logger.error(f"Unerwarteter Fehler: {e}")
return {"elements": [], "error": "unknown"}
# ❌ Vermeiden - Unbehandelte Fehler
data = downloader.download_admin_level(municipality, level) # Kein Error-HandlingKonfiguration
Environment Settings
python
# Konfiguration für verschiedene Umgebungen
OVERPASS_CONFIG = {
'development': {
'timeout': 180,
'max_retries': 3,
'requests_per_minute': 30
},
'production': {
'timeout': 300,
'max_retries': 5,
'requests_per_minute': 60
},
'testing': {
'timeout': 60,
'max_retries': 1,
'requests_per_minute': 10
}
}
def create_downloader(environment: str = 'development') -> OverpassDownloader:
config = OVERPASS_CONFIG[environment]
return OverpassDownloader(
timeout=config['timeout'],
# Weitere Konfiguration...
)Diese Overpass API Integration bietet eine robuste, performante und skalierbare Lösung für den Zugriff auf OSM-Daten in p2d2, mit umfassendem Load-Balancing, Error-Handling und Monitoring.