# -*- coding: utf-8 -*-
# Copyright (C) 2017, Wolfgang Scherer, <Wolfgang.Scherer at gmx.de>
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Alternatively, permisssion is granted to use this file under the
# provisions of the MIT license:
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""\
Generate SQL statements from SQLAlchemy queries, INSERT and UPDATE
statements.
User API:
- :func:`table_dump_sql`
- :func:`sql_statement`
- :func:`fix_null_params`
- :func:`fix_null_values`
- :func:`null_if_none`
- :class:`LiteralDialect`
- :data:`COMPILE_KWARGS`
Some code incorporated from `python - SQLAlchemy: print the actual query - Stack Overflow`_.
Automatic Exports
-----------------
>>> for ex in __all__: printf(sformat('from {0} import {1}', __name__, ex))
from sql_statements import table_dump_sql
from sql_statements import sql_statement
from sql_statements import fix_null_params
from sql_statements import fix_null_values
from sql_statements import null_if_none
from sql_statements import LiteralDialect
from sql_statements import COMPILE_KWARGS
Explicit Exports
----------------
>>> if '__all_internal__' in globals():
... for ex in __all_internal__:
... printf(sformat('from {0} import {1}', __name__, ex))
from sql_statements import StringLiteral
from sql_statements import get_compile_kwargs
from sql_statements import literal_pre_process
Details
-------
.. _`python - SQLAlchemy: print the actual query - Stack Overflow`: http://stackoverflow.com/a/5698357/2127439
.. _`python - Sqlalchemy - how to get raw sql from insert(), update() statements with binded params? - Stack Overflow`: http://stackoverflow.com/a/42066590/2127439
"""
__all__ = []
__all_internal__ = []
if __name__ == '__main__':
# |:here:|
#ve_activate_ = 've_pyramid_activate.py'
#ve_activate_ = 've_sqlalchemy_1_0_13_activate.py'
#ve_activate_ = 've_sqlalchemy_1_1_6_activate.py'
#ve_activate_ = 've_sqlalchemy_1_2_9_activate.py'
if 've_activate_' in globals():
import os
ve_activate = os.path.join(os.path.dirname(__file__), ve_activate_);
if os.path.exists(ve_activate):
globals_ = dict(globals(), __file__=ve_activate)
exec(compile(open(ve_activate, "rb").read(), ve_activate, 'exec'), globals_)
import datetime
from compat import *
from compat import printe
from sqlalchemy import __version__
from sqlalchemy import create_engine
from sqlalchemy import Column
from sqlalchemy import null
from sqlalchemy.types import (
NullType,
Boolean,
BigInteger, # inherits from Integer
SmallInteger, # inherits from Integer
Integer,
Date,
DateTime,
Time,
Interval,
Float, # inherits from Numeric
Numeric,
UnicodeText, # inherits from Text
Text, # inherits from String
Unicode, # inherits from String
Enum, # inherits from String
String,
LargeBinary, # inherits from _Binary
_Binary,
BINARY, # use for _Binary
)
# --------------------------------------------------
# |:sec:| FEATURES
# --------------------------------------------------
MOCK_DSN = 'sqlite:///memory'
#MOCK_DSN = 'mysql://localhost/'
# |:info:| things from sqlalchemy.dialects do not work with v0.7.9,
# use engine.dialect from a real engine
def mock_dump(sql, *multiparams, **params):
return sql.compile(dialect=MOCK_ENGINE.dialect)
MOCK_ENGINE = create_engine(MOCK_DSN, strategy='mock', executor=mock_dump)
__all_internal__.append('literal_pre_process')
[docs]def literal_pre_process(value, type_, dialect): # ||:fnc:||
"""
:returns:
"""
if value is None:
return ucs('NULL'), NullType(), True
if isinstance(value, (int, long)):
# avoid wrong interpretation of Python2 long values as NULL
return uc_type(value), type_, True
if isinstance(type_, StringLiteral):
if isinstance(value, bytes):
value = ucs(value, dialect.encoding)
# |:info:| ORACLE version of dates
# elif isinstance(value, datetime.time):
# v = sformat("TO_DATE('{0}','HH24:MI:SS')", ev.strftime("%H:%M:%S"))
# done = True
# elif isinstance(value, datetime.date):
# v = sformat("TO_DATE('{0}','YYYY-MM-DD')", ev.strftime("%Y-%m-%d"))
# done = True
# elif isinstance(value, datetime.datetime):
# v = sformat("TO_DATE('{0}','YYYY-MM-DD HH24:MI:SS')", ev.strftime("%Y-%m-%d %H:%M:%S"))
# done = True
elif not isinstance(value, (uc_type)):
value = uc_type(value)
return value, type_, False
# --------------------------------------------------
__all_internal__.append('get_compile_kwargs')
[docs]def get_compile_kwargs():
"""Determine, whether `statement.compile` supports keyword argument `compile_kwargs`.
Called once to define :data:`COMPILE_KWARGS`."""
from sqlalchemy import Table
from sqlalchemy import String, Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import insert
base = declarative_base()
table = Table('test', base.metadata, Column('a', String(10)))
stmt = insert(table).values(**dict(((c, '') for c in ('a',))))
try:
# |:info:| `literal_binds` works for select with v0.9.0
# `literal_binds` works for insert with at least v1.0.13
ckwargs = {'compile_kwargs': {"literal_binds": True}}
compiled = stmt.compile(dialect=MOCK_ENGINE.dialect, **ckwargs)
except TypeError:
# |:info:| `compile_kwargs` does not work with v0.7.9
ckwargs = {}
return ckwargs
__all__.append('COMPILE_KWARGS')
COMPILE_KWARGS = get_compile_kwargs()
"""Keyword arguments for `statement.compile` as determined by :func:`get_compile_kwargs`.
If `statement.compile` does not support keyword argument
`compile_kwargs`, this is an empty dictionary. Otherwise it is a
defined as::
{'compile_kwargs': { 'literal_binds': True}}
"""
# --------------------------------------------------
# :class:`StringLiteral`, :class:`LiteralDialect` adapted from
# http://stackoverflow.com/questions/5631078/sqlalchemy-print-the-actual-query
# since v1.0.11 messes up PY2 long values (they are denoted as NullType)
# python2/3 compatible.
# PY3 = str is not bytes
# text = str if PY3 else unicode
# int_type = int if PY3 else (int, long)
# str_type = str if PY3 else (str, unicode)
__all_internal__.append('StringLiteral')
[docs]class StringLiteral(String): # ||:fnc:||
"""Teach SA how to literalize various things (StringLiteral).
See `python - SQLAlchemy: print the actual query - Stack Overflow`_.
"""
def __init__(self, *args, **kwargs):
# v1.2.9 has a keyword `_enums`.
self._enums = kwargs.pop('_enums', None)
super(StringLiteral, self).__init__(*args, **kwargs)
[docs] def literal_processor(self, dialect):
super_processor = super(StringLiteral, self).literal_processor(dialect)
def process(value):
result, type_, done = literal_pre_process(value, StringLiteral(), dialect)
if not done:
result = super_processor(result)
if isinstance(result, bytes):
result = ucs(result, dialect.encoding)
return result
return process
#from sqlalchemy.engine.default import DefaultDialect
__all__.append('LiteralDialect')
[docs]class LiteralDialect(MOCK_ENGINE.dialect.__class__):
"""Teach SA how to literalize various things (LiteralDialect).
See `python - SQLAlchemy: print the actual query - Stack Overflow`_"""
colspecs = {
# prevent various encoding explosions
String: StringLiteral,
# teach SA about how to literalize date, time, datetime
Date: StringLiteral,
Time: StringLiteral,
DateTime: StringLiteral,
# don't format py2 long integers to NULL
NullType: StringLiteral,
}
# --------------------------------------------------
__all__.append('null_if_none')
[docs]def null_if_none(value): # ||:fnc:||
"""If value is `None`, return :class:`sqlalchemy.null` instance."""
if value is None:
return null()
return value
__all__.append('fix_null_values')
[docs]def fix_null_values(values): # ||:fnc:||
"""Convert `None` values to :class:`sqlalchemy.null` instances."""
return (null_if_none(v) for v in values)
__all__.append('fix_null_params')
[docs]def fix_null_params(params): # ||:fnc:||
"""Convert `None` values to :class:`sqlalchemy.null` instances.
:param params: list of (column, value) tuples or dict()."""
if issequence(params):
# list of (col, val) tuples
return ((c, null_if_none(v)) for c, v in params)
return type(params)(((c, null_if_none(v)) for c, v in ditems(params)))
__all__.append('sql_statement')
[docs]def sql_statement(stmt, dialect=None): # ||:fnc:||
"""Render SQL for a SQA statement.
:returns: SQL string.
:param stmt: SQLAlchemy ORM query or select, insert, update expression.
:param dialect: defaults to :class:`LiteralDialect`.
See `Test Environment`_ for defintions.
**Check correct rendering of various data types**
>>> from sqlalchemy import select, insert, update
>>> from sql_statements_test import *
>>> statement = select([TestTable]).where(TestTable.c.mycol.in_(TEST_TABLE_DATA)).limit(1)
>>> print(nts(sql_statement(statement))) #doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
SELECT db_table.mycol, db_table.cboolean, db_table.cinteger, db_table.cfloat, db_table.cdate, db_table.ctime, db_table.cdatetime, db_table.cstring, db_table.ctext, db_table.cnull, db_table.cenum
FROM db_table
WHERE db_table.mycol IN (5, 1, 100000000000000000000, 3.14159, '2016-10-03', '13:45:00', '2015-06-24 18:09:29.042517', 'UTF-8 snowman: ☃', 'snowman: ☃', NULL, 'foo')
LIMIT 1...;
**Check correct rendering of INSERT, UPDATE**
>>> insert_stmt = insert(TestTableORM.__table__).values(a=1, b=2)
>>> print(sql_statement(insert_stmt))
INSERT INTO db_table_orm (b, a) VALUES (2, 1);
>>> update_stmt = update(TestTableORM.__table__).where(TestTableORM.a == 1).values(b=2)
>>> print(sql_statement(update_stmt))
UPDATE db_table_orm SET b=2 WHERE db_table_orm.a = 1;
See also `python - SQLAlchemy: print the actual query - Stack Overflow`_ and
`python - Sqlalchemy - how to get raw sql from insert(), update() statements with binded params? - Stack Overflow`_.
"""
if dialect is None:
dialect = LiteralDialect()
# |:info:| v1.0.13+ does not like `None` as literal bind value:
# "CompileError: Bind parameter 'cnull' without a renderable value not allowed here."
try:
_parameters = stmt.parameters
except AttributeError:
# select statements don't have `parameters`
_parameters = None
if _parameters:
stmt.parameters = type(_parameters)((
(k, null_if_none(v)) for k, v in ditems(_parameters)))
compiled = stmt.compile(dialect=dialect, **COMPILE_KWARGS)
if _parameters:
stmt.parameters = _parameters
if compiled.params:
# bind parameters were not expanded to literal values, so do it yourself
if dialect.default_paramstyle == 'format':
_sep = '%s'
else:
_sep = '?'
parts = uc_type(compiled).split(_sep)
stmt_ = [parts.pop(0)]
bind_names = compiled.bind_names
bind_params = dict((bind_names[k], k) for k in bind_names)
for bind_value, part in zip([bind_params[n] for n in compiled.positiontup], parts):
value = bind_value.effective_value
type_ = bind_value.type
etype__ = dialect.colspecs.get(type(type_))
if etype__:
etype_ = etype__()
else:
etype_ = type_
value, etype_, done = literal_pre_process(value, etype_, dialect)
if not done:
result = compiled.render_literal_value(value, etype_)
if isinstance(result, bytes):
result = ucs(result, dialect.encoding)
else:
result = value
stmt_.extend((result, part))
return ucs('').join(stmt_) + ucs(';')
return nts(compiled.__str__()) + ';'
__all__.append('table_dump_sql')
[docs]def table_dump_sql(table, rows, columns=None, update=None, no_create=None, dialect=None): # ||:fnc:||
"""Dump sqlalchemy table and data rows as SQL CREATE TABLE and INSERT/UPDATE statements..
:param table: SQLAlchemy :class:`Table` or ORM table
:param rows: list of row data lists
:param columns: list of column names matching the rows
:param update: generate UPDATE statements instead of INSERT.
:param no_create: do not generate CREATE TABLE DDL expression.
:param dialect: defaults to :class:`LiteralDialect`.
Handles both SQL tables and ORM tables transparently.
See `Test Environment`_ for defintions.
>>> from sqlalchemy import select, insert, update
>>> from sql_statements_test import *
**Check SQL table**
>>> print(nts(table_dump_sql(TestTable, (TEST_TABLE_DATA, )))) #doctest: +ELLIPSIS
CREATE TABLE db_table (
mycol INTEGER NOT NULL...,
cboolean BOOL...,
cinteger INTEGER,
cfloat FLOAT,
cdate DATE,
ctime TIME,
cdatetime DATETIME,
cstring VARCHAR(50),
ctext TEXT,
cnull INTEGER,
cenum...,
PRIMARY KEY (mycol),
CHECK (cboolean IN (0, 1))...
);
INSERT INTO db_table (mycol, cboolean, cinteger, cfloat, cdate, ctime, cdatetime, cstring, ctext, cnull, cenum) VALUES (5, ..., 100000000000000000000, 3.14159, '2016-10-03', '13:45:00', '2015-06-24 18:09:29.042517', 'UTF-8 snowman: ☃', 'snowman: ☃', NULL, 'foo');
**Check ORM table**
>>> print(table_dump_sql(TestTableORM, [[1, 10 ** 20, '5'], [2, 6, 'text'], [3, 9, 'done']], 'a b c'.split())) #doctest: +ELLIPSIS
CREATE TABLE db_table_orm (
b INTEGER,
c VARCHAR(10),
a INTEGER NOT NULL...,
PRIMARY KEY (a)
);
INSERT INTO db_table_orm (b, c, a) VALUES (100000000000000000000, '5', 1);
INSERT INTO db_table_orm (b, c, a) VALUES (6, 'text', 2);
INSERT INTO db_table_orm (b, c, a) VALUES (9, 'done', 3);
**Check Update on ORM table**
>>> print(table_dump_sql(TestTableORM, [[1, 10 ** 20, '5'], [2, 6, 'text'], [3, 9, 'done']], 'a b c'.split(), update=True, no_create=True))
UPDATE db_table_orm SET b=100000000000000000000, c='5', a=1 WHERE db_table_orm.a = 1;
UPDATE db_table_orm SET b=6, c='text', a=2 WHERE db_table_orm.a = 2;
UPDATE db_table_orm SET b=9, c='done', a=3 WHERE db_table_orm.a = 3;
"""
import re
import sqlalchemy
from sqlalchemy.schema import CreateTable
if columns is None:
columns = [c.name for c in getattr(table, '__table__', table).c]
if dialect is None:
dialect = LiteralDialect()
sql_dump = []
# |:info:| insert(orm_table) does not work with v0.7.9,
# must use insert(orm_table.__table__)
the_table = getattr(table, '__table__', table)
if not no_create:
sql_dump.append(re.sub(
'\\t', ' ', re.sub(
'[ \\t\\r]+$(?m)', '', str(
CreateTable(the_table).compile(
dialect=dialect)).strip() + ';')))
if update:
insert_or_update = sqlalchemy.update
primary_keys = [c for c in the_table.c if c.primary_key]
else:
insert_or_update = sqlalchemy.insert
primary_keys = None
for row in rows:
data = dict(zip(columns, row))
stmt = insert_or_update(the_table).values(**data)
if primary_keys:
from sqlalchemy import and_
stmt = stmt.where(and_(*[c == data.get(c.name) for c in primary_keys]))
sql_dump.append(sql_statement(stmt, dialect=dialect))
return '\n'.join(sql_dump)
if '__all_internal__' in globals():
import sys
if 'sphinx.ext.autodoc' in sys.modules:
printe('HERE')
__all__[:0] = __all_internal__
__all_internal__ = list(reversed(__all_internal__))
__all__ = list(reversed(__all__))
if __name__ == '__main__':
printe(sformat(' {0:<8s} {1!s:<43s}', __version__, COMPILE_KWARGS))
if '_canonize_module_' in globals():
_canonize_module_(__name__, True)
import doctest
doctest.testmod()
# :ide: COMPILE: Run Python3 w/o args
# . (progn (save-buffer) (compile (concat "python3 ./" (file-name-nondirectory (buffer-file-name)) "")))
# :ide: COMPILE: Run w/o args
# . (progn (save-buffer) (compile (concat "python ./" (file-name-nondirectory (buffer-file-name)) "")))
# :ide: +-#+
# . Compile ()