# =================================================================
#
# Authors: Just van den Broecke <justb4@gmail.com>
#
# Copyright (c) 2019 Just van den Broecke
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================
import importlib
import logging
from osgeo import gdal as osgeo_gdal
from osgeo import ogr as osgeo_ogr
from osgeo import osr as osgeo_osr
from pygeoapi.provider.base import (BaseProvider)
LOGGER = logging.getLogger(__name__)
[docs]class OGRProvider(BaseProvider):
"""
OGR Provider. Uses GDAL/OGR Python-bindings to access OGR
Vector sources. References:
https://pcjericks.github.io/py-gdalogr-cookbook/
https://www.gdal.org/ogr_formats.html (per-driver specifics).
In theory any OGR source type (Driver) could be used, although
some Source Types are Driver-specific handling. This is handled
in Source Helper classes, instantiated per Source-Type.
The following Source Types have been tested to work:
GeoPackage (GPKG), SQLite, GeoJSON, ESRI Shapefile, WFS v2.
"""
# To deal with some OGR Source-Driver specifics.
SOURCE_HELPERS = {
'ESRIJSON': 'pygeoapi.provider.ogr.ESRIJSONHelper',
'WFS': 'pygeoapi.provider.ogr.WFSHelper',
'*': 'pygeoapi.provider.ogr.CommonSourceHelper'
}
def __init__(self, provider_def):
"""
Initialize object
# Typical OGRProvider YAML config:
provider:
name: OGR
data:
source_type: WFS
source: WFS:http://geodata.nationaalgeoregister.nl/rdinfo/wfs?
source_srs: EPSG:28992
target_srs: EPSG:4326
source_capabilities:
paging: True
source_options:
OGR_WFS_LOAD_MULTIPLE_LAYER_DEFN: NO
# open_options:
# EXPOSE_GML_ID: NO
gdal_ogr_options:
EMPTY_AS_NULL: NO
GDAL_CACHEMAX: 64
# GDAL_HTTP_PROXY: (optional proxy)
# GDAL_PROXY_AUTH: (optional auth for remote WFS)
CPL_DEBUG: NO
id_field: gml_id
layer: rdinfo:stations
:param provider_def: provider definition
:returns: pygeoapi.providers.ogr.OGRProvider
"""
BaseProvider.__init__(self, provider_def)
self.ogr = osgeo_ogr
# http://trac.osgeo.org/gdal/wiki/PythonGotchas
self.gdal = osgeo_gdal
self.gdal.UseExceptions()
LOGGER.info("Using GDAL/OGR version: %d"
% int(osgeo_gdal.VersionInfo('VERSION_NUM')))
# GDAL error handler function
# http://pcjericks.github.io/py-gdalogr-cookbook/gdal_general.html
def gdal_error_handler(err_class, err_num, err_msg):
err_type = {
osgeo_gdal.CE_None: 'None',
osgeo_gdal.CE_Debug: 'Debug',
osgeo_gdal.CE_Warning: 'Warning',
osgeo_gdal.CE_Failure: 'Failure',
osgeo_gdal.CE_Fatal: 'Fatal'
}
err_msg = err_msg.replace('\n', ' ')
err_class = err_type.get(err_class, 'None')
LOGGER.error('Error Number: %s, Type: %s, Msg: %s'
% (err_num, err_class, err_msg))
# install error handler
self.gdal.PushErrorHandler(gdal_error_handler)
LOGGER.debug('Setting OGR properties')
self.data_def = provider_def['data']
# Generic GDAL/OGR options (optional)
gdal_ogr_options = self.data_def.get('gdal_ogr_options', {})
for key in gdal_ogr_options:
self.gdal.SetConfigOption(key, str(gdal_ogr_options[key]))
# Driver-specific options (optional)
source_options = self.data_def.get('source_options', {})
for key in source_options:
self.gdal.SetConfigOption(key, str(source_options[key]))
# Open options
self.open_options = self.data_def.get('open_options', {})
self.source_capabilities = self.data_def.get('source_capabilities',
{'paging': False})
self.source_srs = int(self.data_def.get('source_srs',
'EPSG:4326').split(':')[1])
self.target_srs = int(self.data_def.get('target_srs',
'EPSG:4326').split(':')[1])
# Optional coordinate transformation inward (requests) and
# outward (responses) when the source layers and WFS3 collections
# differ in EPSG-codes.
self.transform_in = None
self.transform_out = None
if self.source_srs != self.target_srs:
source = osgeo_osr.SpatialReference()
source.ImportFromEPSG(self.source_srs)
target = osgeo_osr.SpatialReference()
target.ImportFromEPSG(self.target_srs)
self.transform_in = \
osgeo_osr.CoordinateTransformation(target, source)
self.transform_out = \
osgeo_osr.CoordinateTransformation(source, target)
self._load_source_helper(self.data_def['source_type'])
# Layer name is required
self.layer_name = provider_def.get('layer', None)
if not self.layer_name:
msg = 'Need explicit \'layer\' attr in provider config'
LOGGER.error(msg)
raise Exception(msg)
# Init driver and Source connection
self.driver = None
self.conn = None
def _list_open_options(self):
return [
f"{key}={str(value)}" for key, value in self.open_options.items()]
def _open(self):
source_type = self.data_def['source_type']
self.driver = self.ogr.GetDriverByName(source_type)
if not self.driver:
msg = 'No Driver for Source: {}'.format(source_type)
LOGGER.error(msg)
raise Exception(msg)
if self.open_options:
self.conn = self.gdal.OpenEx(
self.data_def['source'],
self.gdal.OF_VECTOR,
open_options=self._list_open_options())
else:
self.conn = self.driver.Open(self.data_def['source'], 0)
if not self.conn:
msg = 'Cannot open OGR Source: %s' % self.data_def['source']
LOGGER.error(msg)
raise Exception(msg)
# Always need to disable paging immediately after Open!
if self.source_capabilities['paging']:
self.source_helper.disable_paging()
def _close(self):
self.source_helper.close()
self.conn = None
LOGGER.debug('closed self.conn')
self.driver = None
def _get_layer(self):
if not self.conn:
self._open()
# Delegate getting Layer to SourceHelper
return self.source_helper.get_layer()
[docs] def get_fields(self):
"""
Get provider field information (names, types)
:returns: dict of fields
"""
fields = {}
try:
layer_defn = self._get_layer().GetLayerDefn()
for fld in range(layer_defn.GetFieldCount()):
field_defn = layer_defn.GetFieldDefn(fld)
fieldName = field_defn.GetName()
fieldTypeCode = field_defn.GetType()
fieldType = field_defn.GetFieldTypeName(fieldTypeCode)
fields[fieldName] = fieldType.lower()
# fieldWidth = layer_defn.GetFieldDefn(fld).GetWidth()
# GetPrecision = layer_defn.GetFieldDefn(fld).GetPrecision()
except Exception as err:
LOGGER.error(err)
finally:
self._close()
return fields
[docs] def query(self, startindex=0, limit=10, resulttype='results',
bbox=[], datetime=None, properties=[], sortby=[]):
"""
Query OGR source
:param startindex: starting record to return (default 0)
:param limit: number of records to return (default 10)
:param resulttype: return results or hit limit (default results)
:param bbox: bounding box [minx,miny,maxx,maxy]
:param datetime: temporal (datestamp or extent)
:param properties: list of tuples (name, value)
:param sortby: list of dicts (property, order)
:returns: dict of 0..n GeoJSON features
"""
result = None
try:
if self.source_capabilities['paging']:
self.source_helper.enable_paging(startindex, limit)
layer = self._get_layer()
if bbox:
LOGGER.debug('processing bbox parameter')
minx, miny, maxx, maxy = bbox
wkt = "POLYGON (({minx} {miny},{minx} {maxy},{maxx} {maxy}," \
"{maxx} {miny},{minx} {miny}))".format(
minx=float(minx), miny=float(miny),
maxx=float(maxx), maxy=float(maxy))
polygon = self.ogr.CreateGeometryFromWkt(wkt)
if self.transform_in:
polygon.Transform(self.transform_in)
layer.SetSpatialFilter(polygon)
# layer.SetSpatialFilterRect(
# float(minx), float(miny), float(maxx), float(maxy))
# Make response based on resulttype specified
if resulttype == 'hits':
LOGGER.debug('hits only specified')
result = self._response_feature_hits(layer)
elif resulttype == 'results':
LOGGER.debug('results specified')
result = self._response_feature_collection(layer, limit)
else:
LOGGER.error('Invalid resulttype: %s' % resulttype)
except Exception as err:
LOGGER.error(err)
finally:
self._close()
return result
[docs] def get(self, identifier):
"""
Get Feature by id
:param identifier: feature id
:returns: feature collection
"""
result = None
try:
LOGGER.debug('Fetching identifier {}'.format(identifier))
layer = self._get_layer()
layer.SetAttributeFilter("{field} = '{id}'".format(
field=self.id_field, id=identifier))
ogr_feature = layer.GetNextFeature()
result = self._ogr_feature_to_json(ogr_feature)
except Exception as err:
LOGGER.error(err)
finally:
self._close()
return result
def __repr__(self):
return '<OGRProvider> {}'.format(self.data)
[docs] def _load_source_helper(self, source_type):
"""
Loads Source Helper by name.
:param Source type: Source type name
:returns: Source Helper object
"""
helper_type = source_type
if source_type not in OGRProvider.SOURCE_HELPERS.keys():
helper_type = '*'
# Create object from full package.class name string.
source_helper_class = OGRProvider.SOURCE_HELPERS[helper_type]
packagename, classname = source_helper_class.rsplit('.', 1)
module = importlib.import_module(packagename)
class_ = getattr(module, classname)
self.source_helper = class_(self)
def _ogr_feature_to_json(self, ogr_feature):
geom = ogr_feature.GetGeometryRef()
if self.transform_out:
# Optionally reproject the geometry
geom.Transform(self.transform_out)
json_feature = ogr_feature.ExportToJson(as_object=True)
try:
json_feature['id'] = json_feature['properties'].pop(self.id_field)
except Exception:
json_feature['id'] = ogr_feature.GetFID()
return json_feature
[docs] def _response_feature_collection(self, layer, limit):
"""
Assembles output from Layer query as
GeoJSON FeatureCollection structure.
:returns: GeoJSON FeatureCollection
"""
feature_collection = {
'type': 'FeatureCollection',
'features': []
}
# See https://github.com/OSGeo/gdal/blob/master/autotest/
# ogr/ogr_wfs.py#L313
layer.ResetReading()
ogr_feature = layer.GetNextFeature()
count = 0
while ogr_feature is not None:
json_feature = self._ogr_feature_to_json(ogr_feature)
feature_collection['features'].append(json_feature)
count += 1
if count == limit:
break
ogr_feature = layer.GetNextFeature()
return feature_collection
[docs] def _response_feature_hits(self, layer):
"""
Assembles GeoJSON hits from OGR Feature count
e.g: http://localhost:5000/collections/
hotosm_bdi_waterways/items?resulttype=hits
:returns: GeoJSON FeaturesCollection
"""
return {
'type': 'FeatureCollection',
'numberMatched': layer.GetFeatureCount(),
'features': []
}
[docs]class InvalidHelperError(Exception):
"""Invalid helper"""
pass
[docs]class SourceHelper:
"""
Helper classes for OGR-specific Source Types (Drivers).
For some actions Driver-specific settings or processing is
required. This is delegated to the OGR SourceHelper classes.
"""
def __init__(self, provider):
"""
Initialize object with related OGRProvider object.
:param provider: provider instance
:returns: pygeoapi.providers.ogr.SourceHelper
"""
self.provider = provider
[docs] def close(self):
"""
OGR Driver-specific handling of closing dataset.
Default is no specific handling.
"""
pass
[docs] def get_layer(self):
"""
Default action to get a Layer object from opened OGR Driver.
:return:
"""
layer = self.provider.conn.GetLayerByName(self.provider.layer_name)
if not layer:
msg = 'Cannot get Layer {} from OGR Source'.\
format(self.provider.layer_name)
LOGGER.error(msg)
raise Exception(msg)
return layer
[docs] def enable_paging(self, startindex=-1, limit=-1):
"""
Enable paged access to dataset (OGR Driver-specific)
"""
pass
[docs] def disable_paging(self):
"""
Disable paged access to dataset (OGR Driver-specific)
"""
pass
[docs]class CommonSourceHelper(SourceHelper):
"""
SourceHelper for most common OGR Source types:
Shapefile, GeoPackage, SQLite, GeoJSON etc.
"""
def __init__(self, provider):
"""
Initialize object
:param provider: provider instance
:returns: pygeoapi.providers.ogr.SourceHelper
"""
SourceHelper.__init__(self, provider)
self.startindex = -1
self.limit = -1
self.result_set = None
[docs] def close(self):
"""
OGR Driver-specific handling of closing dataset.
If ExecuteSQL has been (successfully) called
must close ResultSet explicitly.
https://gis.stackexchange.com/questions/114112/explicitly-close-a-ogr-result-object-from-a-call-to-executesql # noqa
"""
if not self.result_set:
return
try:
self.provider.conn.ReleaseResultSet(self.result_set)
except Exception as err:
msg = 'ReleaseResultSet exception for Layer {}'.format(
self.provider.layer_name)
LOGGER.error(msg, err)
finally:
self.result_set = None
[docs] def enable_paging(self, startindex=-1, limit=-1):
"""
Enable paged access to dataset (OGR Driver-specific)
using OGR SQL https://www.gdal.org/ogr_sql.html
e.g. SELECT * FROM poly LIMIT 10 OFFSET 30
"""
self.startindex = startindex
self.limit = limit
[docs] def disable_paging(self):
"""
Disable paged access to dataset (OGR Driver-specific)
"""
pass
[docs] def get_layer(self):
"""
Gets OGR Layer from opened OGR dataset.
When startindex defined 1 or greater will invoke
OGR SQL SELECT with LIMIT and OFFSET and return
as Layer as ResultSet from ExecuteSQL on dataset.
:return: OGR layer object
"""
if self.startindex <= 0:
return SourceHelper.get_layer(self)
self.close()
sql = "SELECT * FROM {ds_name} LIMIT {limit} OFFSET {offset}".format(
ds_name=self.provider.layer_name,
limit=self.limit,
offset=self.startindex)
self.result_set = self.provider.conn.ExecuteSQL(sql)
# Reset since needs to be set each time explicitly
self.startindex = -1
self.limit = -1
if not self.result_set:
msg = 'Cannot get Layer {} via ExecuteSQL'.format(
self.provider.layer_name)
LOGGER.error(msg)
raise Exception(msg)
return self.result_set
[docs]class ESRIJSONHelper(SourceHelper):
def __init__(self, provider):
"""
Initialize object
:param provider: provider instance
:returns: pygeoapi.providers.ogr.SourceHelper
"""
SourceHelper.__init__(self, provider)
[docs] def enable_paging(self, startindex=-1, limit=-1):
"""
Enable paged access to dataset (OGR Driver-specific)
"""
if startindex < 0:
return
self.provider.gdal.SetConfigOption(
'ESRIJSON_FEATURE_SERVER_PAGING', 'ON')
self.provider.gdal.SetConfigOption(
'OGR_ESRIJSON_START_INDEX', str(startindex))
self.provider.gdal.SetConfigOption(
'OGR_ESRIJSON_PAGE_SIZE', str(limit))
[docs] def disable_paging(self):
"""
Disable paged access to dataset (OGR Driver-specific)
"""
self.provider.gdal.SetConfigOption(
'ESRIJSON_FEATURE_SERVER_PAGING', None)
self.provider.gdal.SetConfigOption(
'OGR_ESRIJSON_PAGE_SIZE', None)
[docs]class WFSHelper(SourceHelper):
def __init__(self, provider):
"""
Initialize object
:param provider: provider instance
:returns: pygeoapi.providers.ogr.SourceHelper
"""
SourceHelper.__init__(self, provider)
[docs] def enable_paging(self, startindex=-1, limit=-1):
"""
Enable paged access to dataset (OGR Driver-specific)
"""
if startindex < 0:
return
self.provider.gdal.SetConfigOption(
'OGR_WFS_PAGING_ALLOWED', 'ON')
self.provider.gdal.SetConfigOption(
'OGR_WFS_BASE_START_INDEX', str(startindex))
self.provider.gdal.SetConfigOption(
'OGR_WFS_PAGE_SIZE', str(limit))
[docs] def disable_paging(self):
"""
Disable paged access to dataset (OGR Driver-specific)
"""
self.provider.gdal.SetConfigOption(
'OGR_WFS_PAGING_ALLOWED', None)
self.provider.gdal.SetConfigOption(
'OGR_WFS_PAGE_SIZE', None)