Source code for sql_statements

# -*- coding: utf-8 -*-
# Copyright (C) 2017, Wolfgang Scherer, <Wolfgang.Scherer at gmx.de>
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Alternatively, permisssion is granted to use this file under the
# provisions of the MIT license:
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""\
Generate SQL statements from SQLAlchemy queries, INSERT and UPDATE
statements.

User API:

- :func:`table_dump_sql`
- :func:`sql_statement`
- :func:`fix_null_params`
- :func:`fix_null_values`
- :func:`null_if_none`
- :class:`LiteralDialect`
- :data:`COMPILE_KWARGS`

Some code incorporated from `python - SQLAlchemy: print the actual query - Stack Overflow`_.

Automatic Exports
-----------------

>>> for ex in __all__: printf(sformat('from {0} import {1}', __name__, ex))
from sql_statements import table_dump_sql
from sql_statements import sql_statement
from sql_statements import fix_null_params
from sql_statements import fix_null_values
from sql_statements import null_if_none
from sql_statements import LiteralDialect
from sql_statements import COMPILE_KWARGS

Explicit Exports
----------------

>>> if '__all_internal__' in globals():
...   for ex in __all_internal__:
...     printf(sformat('from {0} import {1}', __name__, ex))
from sql_statements import StringLiteral
from sql_statements import get_compile_kwargs
from sql_statements import literal_pre_process

Details
-------

.. _`python - SQLAlchemy: print the actual query - Stack Overflow`: http://stackoverflow.com/a/5698357/2127439
.. _`python - Sqlalchemy - how to get raw sql from insert(), update() statements with binded params? - Stack Overflow`: http://stackoverflow.com/a/42066590/2127439
"""
__all__ = []
__all_internal__ = []

if __name__ == '__main__':
    # |:here:|
    #ve_activate_ = 've_pyramid_activate.py'
    #ve_activate_ = 've_sqlalchemy_1_0_13_activate.py'
    #ve_activate_ = 've_sqlalchemy_1_1_6_activate.py'
    #ve_activate_ = 've_sqlalchemy_1_2_9_activate.py'
    if 've_activate_' in globals():
        import os
        ve_activate = os.path.join(os.path.dirname(__file__), ve_activate_);
        if os.path.exists(ve_activate):
            globals_ = dict(globals(), __file__=ve_activate)
            exec(compile(open(ve_activate, "rb").read(), ve_activate, 'exec'), globals_)

import datetime

from compat import *
from compat import printe

from sqlalchemy import __version__
from sqlalchemy import create_engine
from sqlalchemy import Column
from sqlalchemy import null

from sqlalchemy.types import (
    NullType,
    Boolean,
    BigInteger,   # inherits from Integer
    SmallInteger, # inherits from Integer
    Integer,
    Date,
    DateTime,
    Time,
    Interval,
    Float,        # inherits from Numeric
    Numeric,
    UnicodeText,  # inherits from Text
    Text,         # inherits from String
    Unicode,      # inherits from String
    Enum,         # inherits from String
    String,
    LargeBinary,  # inherits from _Binary
    _Binary,
    BINARY,       # use for _Binary
    )

# --------------------------------------------------
# |:sec:| FEATURES
# --------------------------------------------------

MOCK_DSN = 'sqlite:///memory'
#MOCK_DSN = 'mysql://localhost/'

# |:info:| things from sqlalchemy.dialects do not work with v0.7.9,
# use engine.dialect from a real engine
def mock_dump(sql, *multiparams, **params):
    return sql.compile(dialect=MOCK_ENGINE.dialect)
MOCK_ENGINE = create_engine(MOCK_DSN, strategy='mock', executor=mock_dump)

__all_internal__.append('literal_pre_process')
[docs]def literal_pre_process(value, type_, dialect): # ||:fnc:|| """ :returns: """ if value is None: return ucs('NULL'), NullType(), True if isinstance(value, (int, long)): # avoid wrong interpretation of Python2 long values as NULL return uc_type(value), type_, True if isinstance(type_, StringLiteral): if isinstance(value, bytes): value = ucs(value, dialect.encoding) # |:info:| ORACLE version of dates # elif isinstance(value, datetime.time): # v = sformat("TO_DATE('{0}','HH24:MI:SS')", ev.strftime("%H:%M:%S")) # done = True # elif isinstance(value, datetime.date): # v = sformat("TO_DATE('{0}','YYYY-MM-DD')", ev.strftime("%Y-%m-%d")) # done = True # elif isinstance(value, datetime.datetime): # v = sformat("TO_DATE('{0}','YYYY-MM-DD HH24:MI:SS')", ev.strftime("%Y-%m-%d %H:%M:%S")) # done = True elif not isinstance(value, (uc_type)): value = uc_type(value) return value, type_, False
# -------------------------------------------------- __all_internal__.append('get_compile_kwargs')
[docs]def get_compile_kwargs(): """Determine, whether `statement.compile` supports keyword argument `compile_kwargs`. Called once to define :data:`COMPILE_KWARGS`.""" from sqlalchemy import Table from sqlalchemy import String, Column from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import insert base = declarative_base() table = Table('test', base.metadata, Column('a', String(10))) stmt = insert(table).values(**dict(((c, '') for c in ('a',)))) try: # |:info:| `literal_binds` works for select with v0.9.0 # `literal_binds` works for insert with at least v1.0.13 ckwargs = {'compile_kwargs': {"literal_binds": True}} compiled = stmt.compile(dialect=MOCK_ENGINE.dialect, **ckwargs) except TypeError: # |:info:| `compile_kwargs` does not work with v0.7.9 ckwargs = {} return ckwargs
__all__.append('COMPILE_KWARGS') COMPILE_KWARGS = get_compile_kwargs() """Keyword arguments for `statement.compile` as determined by :func:`get_compile_kwargs`. If `statement.compile` does not support keyword argument `compile_kwargs`, this is an empty dictionary. Otherwise it is a defined as:: {'compile_kwargs': { 'literal_binds': True}} """ # -------------------------------------------------- # :class:`StringLiteral`, :class:`LiteralDialect` adapted from # http://stackoverflow.com/questions/5631078/sqlalchemy-print-the-actual-query # since v1.0.11 messes up PY2 long values (they are denoted as NullType) # python2/3 compatible. # PY3 = str is not bytes # text = str if PY3 else unicode # int_type = int if PY3 else (int, long) # str_type = str if PY3 else (str, unicode) __all_internal__.append('StringLiteral')
[docs]class StringLiteral(String): # ||:fnc:|| """Teach SA how to literalize various things (StringLiteral). See `python - SQLAlchemy: print the actual query - Stack Overflow`_. """ def __init__(self, *args, **kwargs): # v1.2.9 has a keyword `_enums`. self._enums = kwargs.pop('_enums', None) super(StringLiteral, self).__init__(*args, **kwargs)
[docs] def literal_processor(self, dialect): super_processor = super(StringLiteral, self).literal_processor(dialect) def process(value): result, type_, done = literal_pre_process(value, StringLiteral(), dialect) if not done: result = super_processor(result) if isinstance(result, bytes): result = ucs(result, dialect.encoding) return result return process
#from sqlalchemy.engine.default import DefaultDialect __all__.append('LiteralDialect')
[docs]class LiteralDialect(MOCK_ENGINE.dialect.__class__): """Teach SA how to literalize various things (LiteralDialect). See `python - SQLAlchemy: print the actual query - Stack Overflow`_""" colspecs = { # prevent various encoding explosions String: StringLiteral, # teach SA about how to literalize date, time, datetime Date: StringLiteral, Time: StringLiteral, DateTime: StringLiteral, # don't format py2 long integers to NULL NullType: StringLiteral, }
# -------------------------------------------------- __all__.append('null_if_none')
[docs]def null_if_none(value): # ||:fnc:|| """If value is `None`, return :class:`sqlalchemy.null` instance.""" if value is None: return null() return value
__all__.append('fix_null_values')
[docs]def fix_null_values(values): # ||:fnc:|| """Convert `None` values to :class:`sqlalchemy.null` instances.""" return (null_if_none(v) for v in values)
__all__.append('fix_null_params')
[docs]def fix_null_params(params): # ||:fnc:|| """Convert `None` values to :class:`sqlalchemy.null` instances. :param params: list of (column, value) tuples or dict().""" if issequence(params): # list of (col, val) tuples return ((c, null_if_none(v)) for c, v in params) return type(params)(((c, null_if_none(v)) for c, v in ditems(params)))
__all__.append('sql_statement')
[docs]def sql_statement(stmt, dialect=None): # ||:fnc:|| """Render SQL for a SQA statement. :returns: SQL string. :param stmt: SQLAlchemy ORM query or select, insert, update expression. :param dialect: defaults to :class:`LiteralDialect`. See `Test Environment`_ for defintions. **Check correct rendering of various data types** >>> from sqlalchemy import select, insert, update >>> from sql_statements_test import * >>> statement = select([TestTable]).where(TestTable.c.mycol.in_(TEST_TABLE_DATA)).limit(1) >>> print(nts(sql_statement(statement))) #doctest: +NORMALIZE_WHITESPACE +ELLIPSIS SELECT db_table.mycol, db_table.cboolean, db_table.cinteger, db_table.cfloat, db_table.cdate, db_table.ctime, db_table.cdatetime, db_table.cstring, db_table.ctext, db_table.cnull, db_table.cenum FROM db_table WHERE db_table.mycol IN (5, 1, 100000000000000000000, 3.14159, '2016-10-03', '13:45:00', '2015-06-24 18:09:29.042517', 'UTF-8 snowman: ☃', 'snowman: ☃', NULL, 'foo') LIMIT 1...; **Check correct rendering of INSERT, UPDATE** >>> insert_stmt = insert(TestTableORM.__table__).values(a=1, b=2) >>> print(sql_statement(insert_stmt)) INSERT INTO db_table_orm (b, a) VALUES (2, 1); >>> update_stmt = update(TestTableORM.__table__).where(TestTableORM.a == 1).values(b=2) >>> print(sql_statement(update_stmt)) UPDATE db_table_orm SET b=2 WHERE db_table_orm.a = 1; See also `python - SQLAlchemy: print the actual query - Stack Overflow`_ and `python - Sqlalchemy - how to get raw sql from insert(), update() statements with binded params? - Stack Overflow`_. """ if dialect is None: dialect = LiteralDialect() # |:info:| v1.0.13+ does not like `None` as literal bind value: # "CompileError: Bind parameter 'cnull' without a renderable value not allowed here." try: _parameters = stmt.parameters except AttributeError: # select statements don't have `parameters` _parameters = None if _parameters: stmt.parameters = type(_parameters)(( (k, null_if_none(v)) for k, v in ditems(_parameters))) compiled = stmt.compile(dialect=dialect, **COMPILE_KWARGS) if _parameters: stmt.parameters = _parameters if compiled.params: # bind parameters were not expanded to literal values, so do it yourself if dialect.default_paramstyle == 'format': _sep = '%s' else: _sep = '?' parts = uc_type(compiled).split(_sep) stmt_ = [parts.pop(0)] bind_names = compiled.bind_names bind_params = dict((bind_names[k], k) for k in bind_names) for bind_value, part in zip([bind_params[n] for n in compiled.positiontup], parts): value = bind_value.effective_value type_ = bind_value.type etype__ = dialect.colspecs.get(type(type_)) if etype__: etype_ = etype__() else: etype_ = type_ value, etype_, done = literal_pre_process(value, etype_, dialect) if not done: result = compiled.render_literal_value(value, etype_) if isinstance(result, bytes): result = ucs(result, dialect.encoding) else: result = value stmt_.extend((result, part)) return ucs('').join(stmt_) + ucs(';') return nts(compiled.__str__()) + ';'
__all__.append('table_dump_sql')
[docs]def table_dump_sql(table, rows, columns=None, update=None, no_create=None, dialect=None): # ||:fnc:|| """Dump sqlalchemy table and data rows as SQL CREATE TABLE and INSERT/UPDATE statements.. :param table: SQLAlchemy :class:`Table` or ORM table :param rows: list of row data lists :param columns: list of column names matching the rows :param update: generate UPDATE statements instead of INSERT. :param no_create: do not generate CREATE TABLE DDL expression. :param dialect: defaults to :class:`LiteralDialect`. Handles both SQL tables and ORM tables transparently. See `Test Environment`_ for defintions. >>> from sqlalchemy import select, insert, update >>> from sql_statements_test import * **Check SQL table** >>> print(nts(table_dump_sql(TestTable, (TEST_TABLE_DATA, )))) #doctest: +ELLIPSIS CREATE TABLE db_table ( mycol INTEGER NOT NULL..., cboolean BOOL..., cinteger INTEGER, cfloat FLOAT, cdate DATE, ctime TIME, cdatetime DATETIME, cstring VARCHAR(50), ctext TEXT, cnull INTEGER, cenum..., PRIMARY KEY (mycol), CHECK (cboolean IN (0, 1))... ); INSERT INTO db_table (mycol, cboolean, cinteger, cfloat, cdate, ctime, cdatetime, cstring, ctext, cnull, cenum) VALUES (5, ..., 100000000000000000000, 3.14159, '2016-10-03', '13:45:00', '2015-06-24 18:09:29.042517', 'UTF-8 snowman: ☃', 'snowman: ☃', NULL, 'foo'); **Check ORM table** >>> print(table_dump_sql(TestTableORM, [[1, 10 ** 20, '5'], [2, 6, 'text'], [3, 9, 'done']], 'a b c'.split())) #doctest: +ELLIPSIS CREATE TABLE db_table_orm ( b INTEGER, c VARCHAR(10), a INTEGER NOT NULL..., PRIMARY KEY (a) ); INSERT INTO db_table_orm (b, c, a) VALUES (100000000000000000000, '5', 1); INSERT INTO db_table_orm (b, c, a) VALUES (6, 'text', 2); INSERT INTO db_table_orm (b, c, a) VALUES (9, 'done', 3); **Check Update on ORM table** >>> print(table_dump_sql(TestTableORM, [[1, 10 ** 20, '5'], [2, 6, 'text'], [3, 9, 'done']], 'a b c'.split(), update=True, no_create=True)) UPDATE db_table_orm SET b=100000000000000000000, c='5', a=1 WHERE db_table_orm.a = 1; UPDATE db_table_orm SET b=6, c='text', a=2 WHERE db_table_orm.a = 2; UPDATE db_table_orm SET b=9, c='done', a=3 WHERE db_table_orm.a = 3; """ import re import sqlalchemy from sqlalchemy.schema import CreateTable if columns is None: columns = [c.name for c in getattr(table, '__table__', table).c] if dialect is None: dialect = LiteralDialect() sql_dump = [] # |:info:| insert(orm_table) does not work with v0.7.9, # must use insert(orm_table.__table__) the_table = getattr(table, '__table__', table) if not no_create: sql_dump.append(re.sub( '\\t', ' ', re.sub( '[ \\t\\r]+$(?m)', '', str( CreateTable(the_table).compile( dialect=dialect)).strip() + ';'))) if update: insert_or_update = sqlalchemy.update primary_keys = [c for c in the_table.c if c.primary_key] else: insert_or_update = sqlalchemy.insert primary_keys = None for row in rows: data = dict(zip(columns, row)) stmt = insert_or_update(the_table).values(**data) if primary_keys: from sqlalchemy import and_ stmt = stmt.where(and_(*[c == data.get(c.name) for c in primary_keys])) sql_dump.append(sql_statement(stmt, dialect=dialect)) return '\n'.join(sql_dump)
if '__all_internal__' in globals(): import sys if 'sphinx.ext.autodoc' in sys.modules: printe('HERE') __all__[:0] = __all_internal__ __all_internal__ = list(reversed(__all_internal__)) __all__ = list(reversed(__all__)) if __name__ == '__main__': printe(sformat(' {0:<8s} {1!s:<43s}', __version__, COMPILE_KWARGS)) if '_canonize_module_' in globals(): _canonize_module_(__name__, True) import doctest doctest.testmod() # :ide: COMPILE: Run Python3 w/o args # . (progn (save-buffer) (compile (concat "python3 ./" (file-name-nondirectory (buffer-file-name)) ""))) # :ide: COMPILE: Run w/o args # . (progn (save-buffer) (compile (concat "python ./" (file-name-nondirectory (buffer-file-name)) ""))) # :ide: +-#+ # . Compile ()