# Copyright (C) 2019-2021 HERE Europe B.V.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
# License-Filename: LICENSE
"""
This is a collection of utilities for using XYZ Hub.
Actually, they are almost unspecific to any XYZ Hub functionality, apart
from :func:`feature_to_bbox`, but convenient to use.
"""
import logging
import math
import os
import warnings
from itertools import zip_longest
from typing import TYPE_CHECKING, List, Optional
from geojson import Feature, FeatureCollection, Point, Polygon
from xyzspaces._compact import HAS_GEOPANDAS, HAS_TURFPY
if TYPE_CHECKING:
import geopandas as gpd
if HAS_TURFPY:
from turfpy.measurement import bbox, bbox_polygon, distance, length
from turfpy.transformation import intersect
if HAS_GEOPANDAS:
import geopandas as gpd # noqa
from shapely import geometry, wkt
logger = logging.getLogger(__name__)
[docs]def join_string_lists(**kwargs) -> dict:
"""Convert named lists of strings to one dict with comma-separated strings.
:param kwargs: Lists of strings
:return: Converted dict.
Example:
>>> join_string_lists(foo=["a", "b", "c"], bar=["a", "b"], foobar=None)
{"foo": "a,b,c", "bar": "a,b"}
"""
return {k: ",".join(v) for k, v in kwargs.items() if v}
# TODO: Check if this is not also provided by geojson package...
# Almost: list(geojson.coords(obj)
# This should also be a field in feature JSON blob...
[docs]def feature_to_bbox(feature: dict) -> List[float]:
"""Extract bounding box from GeoJSON feature rectangle.
:param feature: A dict representing a GeoJSON feature.
:return: A list of four floats representing West, South, East and North
margins of the resulting bounding box.
"""
coords = feature["geometry"]["coordinates"][0]
if len(coords) == 5:
assert coords[-1] == coords[0]
del coords[-1]
p0, p1 = coords[0], coords[1]
w, s, e, n = p0[0], p0[1], p1[0], p1[1]
# unpacking lon, lat, alt in single variable c to avoid issues when alt is
# missing in GeoJSON lon = c[0], lat = c[1], alt = c[2]
for c in coords[1:]:
if c[0] < w:
w = c[0]
if c[0] > e:
e = c[0]
if c[1] < s:
s = c[1]
if c[1] > n:
n = c[1]
return [w, s, e, n]
[docs]def get_xyz_token() -> str:
"""
Read and return the value of the environment variable ``XYZ_TOKEN``.
:return: The string value of the environment variable or an empty string
if no such variable could be found.
"""
xyz_token = os.environ.get("XYZ_TOKEN")
if xyz_token is None:
warnings.warn("No token found in environment variable XYZ_TOKEN.")
return xyz_token or ""
[docs]def grouper(size, iterable, fillvalue=None):
"""
Create groups of `size` each from given iterable.
:param size: An int representing size of each group.
:param iterable: An iterable.
:param fillvalue: Value to put for the last group.
:return: A generator.
"""
args = [iter(iterable)] * size
return zip_longest(fillvalue=fillvalue, *args)
[docs]def wkt_to_geojson(wkt_data: str) -> dict:
"""
Converts wkt to geojson
:param wkt_data: wkt data to be converted
:return: Geojson
"""
parsed_wkt = wkt.loads(wkt_data)
geo = geometry.mapping(parsed_wkt)
if geo["type"] == "GeometryCollection":
feature_collection = []
for g in geo["geometries"]:
feature = Feature(geometry=g)
feature_collection.append(feature)
return FeatureCollection(feature_collection)
else:
return Feature(geometry=geo)
[docs]def grid(bbox, cell_width, cell_height, units):
"""
This function generates the grids for the given bounding box
:param bbox: bounding box coordinates
:param cell_width: Cell width in specified in units
:param cell_height: Cell height in specified in units
:param units: Units for given sizes
:return: FeatureCollection of grid boxes genarated
for the giving bounding box
"""
results = []
west = bbox[0]
south = bbox[1]
east = bbox[2]
north = bbox[3]
start = Feature(geometry=Point((west, south)))
end = Feature(geometry=Point((east, south)))
x_fraction = cell_width / (distance(start, end, units))
cell_width_deg = x_fraction * (east - west)
start = Feature(geometry=Point((west, south)))
end = Feature(geometry=Point((west, north)))
y_fraction = cell_height / (distance(start, end, units))
cell_height_deg = y_fraction * (north - south)
# rows & columns
bbox_width = east - west
bbox_height = north - south
columns = math.ceil(bbox_width / cell_width_deg)
rows = math.ceil(bbox_height / cell_height_deg)
# if the grid does not fill the bbox perfectly, center it.
delta_x = (bbox_width - columns * cell_width_deg) / 2
delta_y = (bbox_height - rows * cell_height_deg) / 2
# iterate over columns & rows
current_x = west + delta_x
for column in range(0, columns):
current_y = south + delta_y
for row in range(0, rows):
cell_poly = Feature(
geometry=Polygon(
[
[
[current_x, current_y],
[current_x, current_y + cell_height_deg],
[
current_x + cell_width_deg,
current_y + cell_height_deg,
],
[current_x + cell_width_deg, current_y],
[current_x, current_y],
]
]
)
)
results.append(cell_poly)
current_y += cell_height_deg
current_x += cell_width_deg
return FeatureCollection(results)
[docs]def divide_bbox(
feature: dict,
cell_width: Optional[float] = None,
units: Optional[str] = "m",
):
"""
Divides the given feature into grid
boxes as per given cell width
:param feature: Feature to be divide in grid
:param cell_width: Width of each grid boxs
:param units: Units for the width of grid boxs
:return: List of features in which
the input feature is divided
"""
bb = bbox(feature)
bbox_polygon_feature = bbox_polygon(bb)
if not cell_width:
gr = grid(
bb,
length(bbox_polygon_feature, units=units) / 4,
length(bbox_polygon_feature, units=units) / 4,
units,
)
else:
gr = grid(bb, cell_width, cell_width, units)
final = []
for f in gr["features"]:
try:
inter = intersect([f, feature])
if inter:
final.append(inter)
except Exception:
logger.debug("The intersection geometry is incorrect")
return final
[docs]def flatten_geometry(data: "gpd.GeoDataFrame") -> "gpd.GeoDataFrame":
"""
Flatten the geometries in the given GeoPandas dataframe.
Flatten geometry is formed by extracting individual geometries from
GeometryCollection, MultiPoint, MultiLineString, MultiPolygon.
:param data: GeoPandas dataframe to be flatten
:return: Flat GeoPandas dataframe
"""
geometry = data.geometry
flattened_geometry = []
flattened_gdf = gpd.GeoDataFrame()
for geom in geometry:
if geom.type in [
"GeometryCollection",
"MultiPoint",
"MultiLineString",
"MultiPolygon",
]:
for subgeom in geom:
flattened_geometry.append(subgeom)
else:
flattened_geometry.append(geom)
flattened_gdf.geometry = flattened_geometry
return flattened_gdf