# =================================================================
#
# Authors: Jorge Samuel Mendes de Jesus <jorge.dejesus@protonmail.net>
# Tom Kralidis <tomkralidis@gmail.com>
#
# Copyright (c) 2018 Jorge Samuel Mendes de Jesus
# Copyright (c) 2019 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================
import sqlite3
import logging
import os
import json
from pygeoapi.plugin import InvalidPluginError
from pygeoapi.provider.base import BaseProvider, ProviderConnectionError
LOGGER = logging.getLogger(__name__)
[docs]class SQLiteGPKGProvider(BaseProvider):
"""Generic provider for SQLITE and GPKG using sqlite3 module.
This module requires install of libsqlite3-mod-spatialite
TODO: DELETE, UPDATE, CREATE
"""
def __init__(self, provider_def):
"""
SQLiteGPKGProvider Class constructor
:param provider_def: provider definitions from yml pygeoapi-config.
data,id_field, name set in parent class
:returns: pygeoapi.providers.base.SQLiteProvider
"""
BaseProvider.__init__(self, provider_def)
self.table = provider_def['table']
self.application_id = None
self.geom_col = None
LOGGER.debug('Setting SQLite properties:')
LOGGER.debug('Data source: {}'.format(self.data))
LOGGER.debug('Name: {}'.format(self.name))
LOGGER.debug('ID_field: {}'.format(self.id_field))
LOGGER.debug('Table: {}'.format(self.table))
self.cursor = self.__load()
LOGGER.debug('Got cursor from DB')
LOGGER.debug('Get available fields/properties')
self.get_fields()
[docs] def get_fields(self):
"""
Get fields from sqlite table (columns are field)
:returns: dict of fields
"""
if not self.fields:
results = self.cursor.execute(
'PRAGMA table_info({})'.format(self.table)).fetchall()
[self.fields.update(
{item["name"]:item["type"].lower()}
) for item in results]
return self.fields
def __response_feature(self, row_data):
"""
Assembles GeoJSON output from DB query
:param row_data: DB row result
:returns: `dict` of GeoJSON Feature
"""
rd = dict(row_data) # sqlite3.Row is doesnt support pop
feature = {
'type': 'Feature'
}
feature["geometry"] = json.loads(
rd.pop('AsGeoJSON({})'.format(self.geom_col))
)
feature['properties'] = rd
feature['id'] = feature['properties'].pop(self.id_field)
return feature
def __response_feature_hits(self, hits):
"""Assembles GeoJSON/Feature number
:returns: GeoJSON FeaturesCollection
"""
feature_collection = {"features": [],
"type": "FeatureCollection"}
feature_collection['numberMatched'] = hits
return feature_collection
def __load(self):
"""
Private method for loading spatiallite,
get the table structure and dump geometry
:returns: sqlite3.Cursor
"""
if (os.path.exists(self.data)):
conn = sqlite3.connect(self.data)
else:
LOGGER.error('Path to sqlite does not exist')
raise InvalidPluginError()
try:
conn.enable_load_extension(True)
except AttributeError as err:
LOGGER.error('Extension loading not enabled: {}'.format(err))
raise ProviderConnectionError()
conn.row_factory = sqlite3.Row
conn.enable_load_extension(True)
# conn.set_trace_callback(LOGGER.debug)
cursor = conn.cursor()
try:
cursor.execute("SELECT load_extension('mod_spatialite.so')")
except sqlite3.OperationalError as err:
LOGGER.error('Extension loading error: {}'.format(err))
raise ProviderConnectionError()
result = cursor.fetchall()
# Checking for geopackage
cursor.execute("PRAGMA application_id")
result = cursor.fetchone()
self.application_id = result["application_id"]
if self.application_id == 1196444487:
LOGGER.info("Detected GPKG 1.2 and greater")
elif self.application_id == 1196437808:
LOGGER.info("Detected GPKG 1.0 or 1.1")
else:
LOGGER.info("No GPKG detected assuming spatial sqlite3")
self.application_id = 0
if self.application_id:
cursor.execute("SELECT AutoGPKGStart()")
result = cursor.fetchall()
if result[0][0] == 1:
LOGGER.info("Loaded Geopackage support")
else:
LOGGER.info("SELECT AutoGPKGStart() returned 0." +
"Detected GPKG but couldnt load support")
raise InvalidPluginError
if self.application_id:
self.geom_col = "geom"
else:
self.geom_col = "geometry"
try:
cursor.execute('PRAGMA table_info({})'.format(self.table))
result = cursor.fetchall()
except sqlite3.OperationalError:
LOGGER.error('Couldnt find table: {}'.format(self.table))
raise ProviderConnectionError()
try:
assert len(result), 'Table not found'
assert len([item for item in result
if self.id_field in item]), 'id_field not present'
except AssertionError:
raise InvalidPluginError
self.columns = [item[1] for item in result if item[1] != self.geom_col]
self.columns = ','.join(self.columns)+',AsGeoJSON({})'.format(
self.geom_col)
if self.application_id:
self.table = "vgpkg_{}".format(self.table)
return cursor
[docs] def query(self, startindex=0, limit=10, resulttype='results',
bbox=[], datetime=None, properties=[], sortby=[]):
"""
Query SQLite/GPKG for all the content.
e,g: http://localhost:5000/collections/countries/items?
limit=5&startindex=2&resulttype=results&continent=Europe&admin=Albania&bbox=29.3373,-3.4099,29.3761,-3.3924
http://localhost:5000/collections/countries/items?continent=Africa&bbox=29.3373,-3.4099,29.3761,-3.3924
:param startindex: starting record to return (default 0)
:param limit: number of records to return (default 10)
:param resulttype: return results or hit limit (default results)
:param bbox: bounding box [minx,miny,maxx,maxy]
:param datetime: temporal (datestamp or extent)
:param properties: list of tuples (name, value)
:param sortby: list of dicts (property, order)
:returns: GeoJSON FeaturesCollection
"""
LOGGER.debug('Querying SQLite/GPKG')
if resulttype == 'hits':
res = self.cursor.execute(
"select count(*) as hits from {};".format(self.table))
hits = res.fetchone()["hits"]
return self.__response_feature_hits(hits)
where_syntax = " where " if (properties or bbox) else ""
where_values = tuple()
if properties:
where_syntax += " and ".join(
["{}=?".format(k) for k, v in properties])
where_values += where_values + tuple((v for k, v in properties))
if bbox:
if properties:
where_syntax += " and "
# TODO: check name of geometry column
where_syntax += " Intersects({}, \
BuildMbr(?,?,?,?)) ".format(self.geom_col)
where_values += tuple(bbox)
sql_query = "select {} from \
{} {} limit ? offset ?".format(
self.columns, self.table, where_syntax)
end_index = startindex + limit
LOGGER.debug('SQL Query: {}'.format(sql_query))
LOGGER.debug('Start Index: {}'.format(startindex))
LOGGER.debug('End Index: {}'.format(end_index))
row_data = self.cursor.execute(
sql_query, where_values + (limit, startindex))
feature_collection = {
'type': 'FeatureCollection',
'features': []
}
for rd in row_data:
feature_collection['features'].append(
self.__response_feature(rd))
return feature_collection
[docs] def get(self, identifier):
"""
Query the provider for a specific
feature id e.g: /collections/countries/items/1
:param identifier: feature id
:returns: GeoJSON FeaturesCollection
"""
LOGGER.debug('Get item from SQLite/GPKG')
sql_query = 'select {} from \
{} where {}==?;'.format(
self.columns, self.table, self.id_field)
LOGGER.debug('SQL Query: {}'.format(sql_query))
LOGGER.debug('Identifier: {}'.format(identifier))
row_data = self.cursor.execute(sql_query, (identifier, )).fetchone()
feature = self.__response_feature(row_data)
return feature
def __repr__(self):
return '<SQLiteGPKGProvider> {}, {}'.format(self.data, self.table)