import pandas as pd import os import logging import numpy as np import ast import math from pathlib import Path # define logger logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("process_data.log"), logging.StreamHandler(), ], ) CITIES_DATA = os.path.join("data", "raw", "2024_08_20_cities_1310_v5.csv") DATA_ENRICHED = os.path.join("data", "cities_enriched.csv") # meta data for kreis codes ( variable in coordinates table) NAME_CODE_DATA = os.path.join("data", "raw", "name_kreiscode.csv") CODES_KOMMUNEN = os.path.join("data", "raw", "Deutschlandatlas.csv") # coordinates for Gemeinden COORDINATES = os.path.join("data", "raw", "coordinates_plz_kreiscode.csv") MISSING = os.path.join("data", "missing_first_parser.csv") if not os.path.exists(os.path.join("data", "preprocessed")): Path(os.path.join("data", "preprocessed")).mkdir(parents=True, exist_ok=True) def load_cities(path: str) -> pd.DataFrame: df = pd.read_csv(path) df.drop_duplicates(subset="Kommune", keep="first", inplace=True) return df def create_code_mapper(path: str) -> dict: name_code = pd.read_csv( path, sep=";", encoding="latin_1", names=["Datum", "Code", "Name", "Fläche"] )[7:13929] # adds all Landkreise and gemeinden to the mapper code_mapper = { (key if type(key) != float else "0000"): value for key, value in zip(name_code["Name"], name_code["Code"]) } # adds all gemeindeverbände to the mapper kommunen_code = pd.read_csv(CODES_KOMMUNEN, sep=";", encoding="latin_1") code_mapper_update = { key: value for key, value in zip(kommunen_code["name"], kommunen_code["Gebietskennziffer"]) } print(code_mapper_update) code_mapper.update(code_mapper_update) return code_mapper def map_code(org_name, code_mapper): # Split the org_name string into parts parts = org_name.split() # print(parts, type(parts[0])) # Find a key in code_mapper that contains all parts of the split org_name for key in code_mapper.keys(): # look first for whole name (cases like "Landkreis München" , "kreisfreie Stadt München") if all(part in key for part in parts): return code_mapper[key] elif any(part in key for part in parts): return code_mapper[key] # Return None or a default value if no key matches all parts return None # main goal with this: identify Landkreise and their codes def add_code(df: pd.DataFrame, code_mapper: dict) -> pd.DataFrame: """Add the (Kreis-/Gemeinde-)code to the dataframe based on the name of the (administrative) region.""" df["Code"] = df["Kommune"].apply(lambda x: map_code(x, code_mapper)) df[df["Code"].isnull()]["Code"] = df[df["Code"].isnull()]["name"].apply( lambda x: map_code(x, code_mapper) ) df["Code"] = df["Code"].apply(lambda x: int(x) if x is not None else None) return df def org_in_plzname(org_name, plz_name): parts = org_name.split() if any(part in plz_name or part in plz_name for part in parts): return True else: return False def load_coordinates(path: str) -> pd.DataFrame: return pd.read_csv(path, sep=";") # maybe 2d coordinates instead of geometry def merge_coordinates(df: pd.DataFrame, coordinates: pd.DataFrame) -> pd.DataFrame: """Merge the coordinates of the regions to the dataframe. Try to use Kreiscode first, if it consists of 5 digits. Else, use the name of the region. """ geometries = [] modified_rows = [] for row in df.itertuples(): # adds coordinates for Landkreise if pd.notna(row.Code) and ( len(str(int(row.Code))) == 5 or len(str(int(row.Code))) == 4 ): coor = coordinates[coordinates["Kreis code"] == row.Code] geometry = [co.geo_point_2d for co in coor.itertuples()] geometries.append(geometry) modified_row = row else: coor = coordinates[ coordinates["PLZ Name (short)"].apply( lambda x: org_in_plzname(row.Kommune, x) ) ] # adds coordinates for Kommunen in coordinates table if len(coor) > 0: geometry = [co.geo_point_2d for co in coor.itertuples()] geometries.append(geometry) modified_row = row # adds coordinates from infered kreis code if Gebietskennziffer available elif row.Code and pd.notna(row.Code): # and not math.isnan(row.Code): if len(str(int(row.Code))) < 4: code_str = str(int(row.Code)) coor = coordinates[ coordinates["Kreis code"] .astype(str) .apply(lambda x: x[: len(code_str)]) == code_str ] geometry = [co.geo_point_2d for co in coor.itertuples()] geometries.append(geometry) elif str(row.Code)[:2] in ["11", "12", "13", "14", "15", "16"]: coor = coordinates[ coordinates["Kreis code"] == int(str(row.Code)[:5]) ] else: coor = coordinates[ coordinates["Kreis code"] == int(str(row.Code)[:4]) ] geometry = [co.geo_point_2d for co in coor.itertuples()] geometries.append(geometry) modified_row = row else: # tries to infer coordinates from name instead of Kommune coor = coordinates[ coordinates["PLZ Name (short)"].apply( lambda x: (org_in_plzname(row.name, x)) ) ] # adds coordinates for name in coordinates table if len(coor) > 0: geometry = [co.geo_point_2d for co in coor.itertuples()] geometries.append(geometry) # switch name and Kommune kommune_new = row.Kommune name_new = row.name modified_row = row._replace(Kommune=name_new, name=kommune_new) print(modified_row) else: geometries.append([]) modified_row = row modified_rows.append(modified_row) df["Geometry"] = geometries # print(modified_rows) modified = pd.DataFrame(modified_rows) modified["Geometry"] = geometries return modified def aggregate_coordinates(geo_element: str) -> list: # Convert the string representation of a list into an actual list if geo_element == "[]" or geo_element == []: return [] else: actual_list = geo_element # ast.literal_eval(geo_element) processed_list = [list(map(float, coord.split(", "))) for coord in actual_list] # print(processed_list) if len(processed_list) > 1: coordinates = np.mean(np.array(processed_list), axis=0) else: coordinates = np.array(processed_list[0]) return coordinates if __name__ == "__main__": code_mapper = create_code_mapper(NAME_CODE_DATA) logging.info("Code mapper created") cities = load_cities(CITIES_DATA) data = add_code(cities, code_mapper) missing = data[data["Code"].isnull()] logging.info(f"Missing values Gebietscode: {len(missing)}") data.to_csv( os.path.join("data", "preprocessed", "cities_enriched_with_code.csv"), index=False, ) # data = pd.read_csv( # os.path.join("data", "preprocessed", "cities_enriched_with_code.csv")) data["Code"] = data["Code"].apply(lambda x: int(x) if pd.notna(x) else None) coordinates = load_coordinates(COORDINATES) data = merge_coordinates(data, coordinates) data.to_csv( os.path.join("data", "preprocessed", "cities_enriched_with_coordinates.csv"), index=False, ) logging.info("Coordinates merged") missing = data[ [ all([x, y]) for x, y in zip( data["Geometry"].apply(lambda x: x == []), data["Code"].isnull() ) ] ] missing_geometry = data[data["Geometry"].apply(lambda x: x == [])] logging.info(f"Missing values: {len(missing)}") logging.info(f"Missing geometry: {len(missing_geometry)}") missing_geometry.to_csv(MISSING, index=False) # data = pd.read_csv(os.path.join("data", "cities_enriched_manually.csv")) data["Geometry"] = data["Geometry"].apply(aggregate_coordinates) data.to_csv(DATA_ENRICHED, index=False)