Source code for ll.orasql

# -*- coding: utf-8 -*-
# cython: language_level=3, always_allow_keywords=True

## Copyright 2004-2021 by LivingLogic AG, Bayreuth/Germany
## Copyright 2004-2021 by Walter Dörwald
##
## All Rights Reserved
##
## See ll/xist/__init__.py for the license


"""
:mod:`!ll.orasql` contains utilities for working with cx_Oracle__:

*	It allows calling procedures and functions with keyword arguments (via the
	classes :class:`Procedure` and :class:`Function`).

*	Query results will be put into :class:`Record` objects, where database
	fields are accessible as object attributes.

*	The :class:`Connection` class provides methods for iterating through the
	database metadata.

*	Importing this module adds support for URLs with the scheme ``oracle`` to
	:mod:`ll.url`. Examples of these URLs are::

		oracle://user:pwd@db/
		oracle://user:pwd@db/view/
		oracle://user:pwd@db/view/USER_TABLES.sql
		oracle://sys:pwd:sysdba@db/

__ https://oracle.github.io/python-cx_Oracle/
"""


import urllib.request, urllib.parse, urllib.error, datetime, itertools, io, errno, re, unicodedata, decimal
from collections import abc

from cx_Oracle import *

from cx_Oracle import __version__ as __cx_oracle_version__

from ll import misc, url as url_


__docformat__ = "reStructuredText"


bigbang = datetime.datetime(1970, 1, 1, 0, 0, 0) # timestamp for Oracle "directories"


ALL = misc.Const("ALL", "ll.orasql") # marker object for specifying a user


###
### Exceptions
###

class SQLObjectNotFoundError(IOError):
	def __init__(self, obj):
		IOError.__init__(self, errno.ENOENT, f"no such {obj.type}: {obj.getfullname()}")
		self.obj = obj


class SQLNoSuchObjectError(Exception):
	def __init__(self, name, owner):
		self.name = name
		self.owner = owner

	def __repr__(self):
		return f"<{self.__class__.__module__}.{self.__class__.__qualname__} name={self.name!r} owner={self.owner!r} at {id(self):#x}>"

	def __str__(self):
		if self.owner is None:
			return f"no object named {self.name!r}"
		else:
			return f"no object named {self.name!r} for owner {self.owner!r}"


class UnknownModeError(ValueError):
	def __init__(self, mode):
		self.mode = mode

	def __repr__(self):
		return f"<{self.__class__.__module__}.{self.__class__.__qualname__} mode={self.mode!r} at {id(self):#x}>"

	def __str__(self):
		return f"unknown mode {self.mode!r}"


class ConflictError(ValueError):
	def __init__(self, object, message):
		self.object = object
		self.message = message

	def __repr__(self):
		return f"<{self.__class__.__module__}.{self.__class__.__qualname__} object={self.object!r} message={self.message!r} at {id(self):#x}>"

	def __str__(self):
		return f"conflict in {self.object!r}: {self.message}"


###
### Helper classes and functions
###

def makeurl(name):
	return urllib.request.pathname2url(name.encode("utf-8")).replace("/", "%2f")


def sqlliteral(value):
	if value is None:
		return "null"
	elif isinstance(value, int):
		return str(value)
	elif isinstance(value, datetime.datetime):
		return f"to_date('{value:%Y-%m-%d %H:%M:%S}', 'YYYY-MM-DD HH24:MI:SS')"
	elif isinstance(value, str):
		value = value.replace("'", "''")
		return f"'{value}'"
	else:
		raise TypeError(f"unknown type {type(value)!r}")


[docs]class Args(dict): """ An :class:`Args` object is a subclass of :class:`dict` that is used for passing arguments to procedures and functions. Both item and attribute access (i.e. :meth:`__getitem__` and :meth:`__getattr__`) are available. Names are case insensitive. """ def __init__(self, arg=None, **kwargs): dict.__init__(self) self.update(arg, **kwargs) def update(self, arg=None, **kwargs): if arg is not None: # if arg is a mapping use iteritems dict.update(self, ((key.lower(), value) for (key, value) in getattr(arg, "iteritems", arg))) dict.update(self, ((key.lower(), value) for (key, value) in kwargs.items())) def __getitem__(self, name): return dict.__getitem__(self, name.lower()) def __setitem__(self, name, value): dict.__setitem__(self, name.lower(), value) def __delitem__(self, name): dict.__delitem__(self, name.lower()) def __getattr__(self, name): try: return self.__getitem__(name) except KeyError: raise AttributeError(name) def __setattr__(self, name, value): self.__setitem__(name, value) def __delattr__(self, name): try: self.__delitem__(name) except KeyError: raise AttributeError(name) def __repr__(self): items = ", ".join(f"{key}={value!r}" for (key, value) in self.items()) return f"{self.__class__.__module__}.{self.__class__.__qualname__}({items})"
[docs]class LOBStream: """ A :class:`LOBStream` object provides streamlike access to a ``BLOB`` or ``CLOB``. """ def __init__(self, value): self.value = value self.pos = 0
[docs] def readall(self): """ Read all remaining data from the stream and return it. """ result = self.value.read(self.pos+1) self.pos = self.value.size() return result
[docs] def readchunk(self): """ Read a chunk of data from the stream and return it. Reading is done in optimally sized chunks. """ size = self.value.getchunksize() bytes = self.value.read(self.pos+1, size) self.pos += size if self.pos >= self.value.size(): self.pos = self.value.size() return bytes
[docs] def read(self, size=None): """ Read ``size`` bytes/characters from the stream and return them. If ``size`` is :const:`None` all remaining data will be read. """ if size is None: return self.readall() if size <= 0: return self.readchunk() data = self.value.read(self.pos+1, size) self.pos += size if self.pos >= self.value.size(): self.pos = self.value.size() return data
[docs] def reset(self): """ Reset the stream so that the next :meth:`read` call starts at the beginning of the LOB. """ self.pos = 0
[docs] def seek(self, offset, whence=0): """ Seek to the position ``offset`` in the LOB. The ``whence`` argument is optional and defaults to ``0`` (absolute file positioning); The other allowed value is ``1`` (seek relative to the current position). """ if whence == 0: self.pos = whence elif whence == 1: self.pos += whence else: raise ValueError(f"unkown whence: {whence!r}") size = self.value.size() if self.pos >= size: self.pos = size elif self.pos < 0: self.pos = 0
def _decodelob(value, readlobs): if value is not None: if readlobs is True or (isinstance(readlobs, int) and value.size() <= readlobs): value = value.read() else: value = LOBStream(value) return value class RecordMaker: def __init__(self, cursor): self._readlobs = cursor.readlobs self._index2name = tuple(d[0].lower() for d in cursor.description) self._name2index = dict(zip(self._index2name, itertools.count())) self._index2conv = tuple(getattr(self, d[1].name, self.DEFAULT) for d in cursor.description) def __call__(self, *row): row = tuple(conv(value) for (conv, value) in zip(self._index2conv, row)) return Record(self._index2name, self._name2index, row) def DB_TYPE_CLOB(self, value): return _decodelob(value, self._readlobs) def DB_TYPE_NCLOB(self, value): return _decodelob(value, self._readlobs) def DB_TYPE_BLOB(self, value): return _decodelob(value, self._readlobs) def DEFAULT(self, value): return value
[docs]class Record(tuple, abc.Mapping): """ A :class:`Record` is a subclass of :class:`tuple` that is used for storing results of database fetches and procedure and function calls. Both item and attribute access (i.e. :meth:`__getitem__` and :meth:`__getattr__`) are available. Field names are case insensitive. """ def __new__(cls, index2name, name2index, values): record = tuple.__new__(cls, values) record._index2name = index2name record._name2index = name2index return record def __getitem__(self, arg): if isinstance(arg, str): arg = self._name2index[arg.lower()] return tuple.__getitem__(self, arg) def __getattr__(self, name): try: index = self._name2index[name.lower()] except KeyError: raise AttributeError(f"{self.__class__.__module__}.{self.__class__.__qualname__} object has no attribute {name!r}") return tuple.__getitem__(self, index) def ul4_getattr(self, name): return getattr(self, name) def ul4_hasattr(self, name): return name.lower() in self._name2index
[docs] def get(self, name, default=None): """ Return the value for the field named ``name``. If this field doesn't exist in ``self``, return ``default`` instead. """ try: index = self._name2index[name.lower()] except KeyError: return default return tuple.__getitem__(self, index)
def __contains__(self, name): return name.lower() in self._name2index
[docs] def keys(self): """ Return an iterator over field names. """ return iter(self._index2name)
[docs] def items(self): """ Return an iterator over (field name, field value) tuples. """ for (index, key) in enumerate(self._index2name): yield (key, tuple.__getitem__(self, index))
[docs] def replace(self, **kwargs): """ Return a new :class:`Record` with the same fields as ``self``, except for those fields given new values by whichever keyword arguments are specified. """ values = list(self) for (key, value) in kwargs.items(): values[self._name2index[key.lower()]] = value return self.__class__(self._index2name, self._name2index, values)
def __repr__(self): items = ", ".join(f"{key}={value!r}" for (key, value) in self.items()) return f"<{self.__class__.__module__}.{self.__class__.__qualname__} {items} at {id(self):#x}>" def _repr_pretty_(self, p, cycle): prefix = f"<{self.__class__.__module__}.{self.__class__.__qualname__}" suffix = f"at {id(self):#x}" if cycle: p.text(f"{prefix} ... {suffix}>") else: with p.group(4, prefix, ">"): for (key, value) in self.items(): p.breakable() p.text(f"{key}=") p.pretty(value) p.breakable() p.text(suffix)
[docs]class SessionPool(SessionPool): """ :class:`SessionPool` is a subclass of :class:`cx_Oracle.SessionPool`. """ def __init__(self, user=None, password=None, dsn=None, **kwargs): super().__init__(user, password, dsn, **{**{"connectiontype": Connection}, **kwargs}) def connectstring(self): return f"{self.username}@{self.dsn}" def __repr__(self): return f"<{self.__class__.__module__}.{self.__class__.__qualname__} object db={self.connectstring()!r} at {id(self):#x}>"
def sqlstr(s): if s is None: return "null" else: s = s.replace("'", "''") return f"'{s}'"
[docs]def owned(obj, owner): """ Do we own the object ``obj`` according to the owner specification ``owner``? ``owner`` can be: :const:`None` The current user (i.e. via the view ``USER_OBJECTS``); :const:`ALL` Any user (via the views ``ALL_OBJECTS`` or ``DBA_OBJECTS``); username : string The specified user :class:`set` or :class:`tuple` A set or tuple of usernames. An object must belong to any of the users to be considered owned. """ # We ignore objects that have no owner (like :class:`JobClass` objects) # we only output them if ``owner is ALL``. if owner is None: if not isinstance(obj, OwnedSchemaObject) or obj.owner is not None: return False elif owner is ALL: return True elif isinstance(owner, (set, tuple)): if not isinstance(obj, OwnedSchemaObject) or not any(owned(obj, o) for o in owner): return False else: if not isinstance(obj, OwnedSchemaObject) or obj.owner != owner: return False return True
[docs]class Connection(Connection): """ :class:`Connection` is a subclass of :class:`cx_Oracle.Connection`. """
[docs] def __init__(self, *args, **kwargs): """ Create a new connection. In addition to the parameters supported by :func:`cx_Oracle.connect` the following keyword argument is supported. ``readlobs`` : bool or integer If ``readlobs`` is :const:`False` all cursor fetches return :class:`LOBStream` objects for LOB objects. If ``readlobs`` is an :class:`int` LOBs with a maximum size of ``readlobs`` will be returned as :class:`bytes`/:class:`str` objects. If ``readlobs`` is :const:`True` all LOB values will be returned as :class:`bytes`/:class:`str` objects. ``decimal`` : bool If ``decimal`` is :const:`True` numbers will be returned as :class:`decimal.Decimal` objects, else :class:`float` will be used. Furthermore the ``clientinfo`` will be automatically set to the name of the currently running script (except if the ``clientinfo`` keyword argument is given and :const:`None`). """ if "readlobs" in kwargs: kwargs = kwargs.copy() self.readlobs = kwargs.pop("readlobs", False) else: self.readlobs = False self.decimal = kwargs.pop("decimal", False) if self.decimal: self.outputtypehandler = self._numbersasdecimal clientinfo = kwargs.pop("clientinfo", misc.sysinfo.short_script_name[-64:]) super().__init__(*args, **kwargs) if clientinfo is not None: self.clientinfo = clientinfo self.commit() self.mode = kwargs.get("mode") self._ddprefix = None # Do we have access to the ``DBA_*`` views? self._ddprefixargs = None # Do we have access to the ``DBA_ARGUMENTS`` view (which doesn't exist in Oracle 10)?
def _numbersasdecimal(self, cursor, name, defaultType, length, precision, scale): if defaultType is DB_TYPE_NUMBER and scale: return cursor.var(decimal.Decimal, arraysize=cursor.arraysize) def connectstring(self): return f"{self.username}@{self.dsn}"
[docs] def cursor(self, readlobs=None): """ Return a new cursor for this connection. For the meaning of ``readlobs`` see :meth:`__init__`. """ return Cursor(self, readlobs=readlobs)
def __repr__(self): return f"<{self.__class__.__module__}.{self.__class__.__qualname__} db={self.connectstring()!r} at {id(self):#x}>"
[docs] def tables(self, owner=None, mode="flat"): """ Generator that yields all table definitions in the current users schema (or all users schemas). ``mode`` specifies the order in which tables will be yielded: ``"create"`` Create order, inserting records into the table in this order will not violate foreign key constraints. ``"drop"`` Drop order, deleting records from the table in this order will not violate foreign key constraints. ``"flat"`` Unordered. ``owner`` specifies from which user tables should be yielded. It can be :const:`None` (for the current user), :const:`ALL` (for all users (the default)) or a user name. Tables that are materialized views will be skipped in all cases. """ if mode not in ("create", "drop", "flat"): raise UnknownModeError(mode) cursor = self.cursor() ddprefix = cursor.ddprefix() tables = Table.objects(self, owner) if mode == "flat": yield from tables else: done = set() tables = {(table.name, table.owner): table for table in tables} def do(table): if table not in done: done.add(table) query = f""" select ac1.table_name, decode(ac1.owner, user, null, ac1.owner) as owner from {ddprefix}_constraints ac1, {ddprefix}_constraints ac2 where ac1.constraint_type = 'R' and ac2.table_name = :name and ac2.owner = nvl(:owner, user) and ac1.r_constraint_name = ac2.constraint_name and ac1.r_owner = ac2.owner """ cursor.execute(query, name=table.name, owner=table.owner) for rec in cursor.fetchall(): try: t2 = tables[(rec.table_name, rec.owner)] except KeyError: pass else: yield from do(t2) yield table for table in tables.values(): yield from do(table)
[docs] def sequences(self, owner=None): """ Generator that yields sequences. ``owner`` can be :const:`None` (the default), :const:`ALL` or a user name. """ return Sequence.objects(self, owner)
[docs] def fks(self, owner=None): """ Generator that yields all foreign key constraints. ``owner`` can be :const:`None` (the default), :const:`ALL` or a user name. """ return ForeignKey.objects(self, owner)
[docs] def privileges(self, owner=None): """ Generator that yields object privileges. ``owner`` can be :const:`None` (the default), :const:`ALL`, a user name or a set or tuple of user names. """ return Privilege.objects(self, owner)
[docs] def synonyms(self, owner=None, object_owner=ALL): """ Generator that yields synonyms. ``owner`` and ``object_owner`` can be :const:`None` (the default), :const:`ALL`, a user name or a set or tuple of user names. """ return Privilege.objects(self, owner)
[docs] def users(self): """ Generator that yields all users. """ return User.objects(self)
[docs] def objects(self, owner=None, mode="create"): """ Generator that yields the sequences, tables, primary keys, foreign keys, table and columns comments, unique constraints, indexes, views, functions, procedures, packages, types and jobsin the current users schema (or all users schemas) in a specified order. ``mode`` specifies the order in which objects will be yielded: ``"create"`` Create order, i.e. recreating the objects in this order will not lead to errors; ``"drop"`` Drop order, i.e. dropping the objects in this order will not lead to errors; ``"flat"`` Unordered. ``owner`` specifies from which schema objects should be yielded. For more information see :func:`owned`. """ if mode not in {"create", "drop", "flat"}: raise UnknownModeError(mode) done = set() def do(obj): if mode == "create": for subobj in obj.referencesall(self, done): if owned(subobj, owner): yield subobj elif mode == "drop": for subobj in obj.referencedbyall(self, done): if owned(subobj, owner): yield subobj else: if obj not in done: done.add(obj) if owned(obj, owner): yield obj def dosequences(): for sequence in Sequence.objects(self, owner): yield from do(sequence) def dotables(): for table in Table.objects(self, owner): if mode == "create" or mode == "flat": yield from do(table) # Table comment yield table.comment() # Column comments for comment in table.comments(): # No dependency checks necessary, but use ``do`` anyway yield from do(comment) # Primary key pk = table.pk() if pk is not None: yield from do(pk) if mode == "drop": yield from do(table) def dorest(): # Since Oracle doesn't give as any dependency information about jobs, # output them last (in "create" mode) to be on the safe side. for type in (CheckConstraint, UniqueConstraint, ForeignKey, Preference, Index, Synonym, View, MaterializedView, Function, Procedure, Package, PackageBody, Type, TypeBody, Trigger, JavaSource, Job): for obj in type.objects(self, owner): yield from do(obj) funcs = [dosequences, dotables, dorest] if mode == "drop": funcs = reversed(funcs) for func in funcs: yield from func()
def _getobject(self, name, owner=None): cursor = self.cursor() ddprefix = cursor.ddprefix() query = f""" select object_name, decode(owner, user, null, owner) as owner, object_type from {ddprefix}_objects where object_name = :object_name and owner = nvl(:owner, user) """ cursor.execute(query, object_name=name, owner=owner) rec = cursor.fetchone() if rec is not None: type = rec.object_type.lower() try: cls = SchemaObject.name2type[type] except KeyError: raise TypeError(f"type {type} not supported") else: return cls(rec.object_name, rec.owner, self) raise SQLNoSuchObjectError(name, owner)
[docs] def objects_named(self, name, owner=None): """ Return all objects named ``name`` from the schema. If ``owner`` is :const:`None` the current schema is queried, else the specified one is used. ``name`` and ``owner`` are treated case insensitively. There might be multiple object with the same name, if these names only differ in casing. Also there *will* be multiple object with the same name for packages and package bodies. """ if isinstance(name, str): name = str(name) if isinstance(owner, str): owner = str(owner) cursor = self.cursor() ddprefix = cursor.ddprefix() if "." in name: name = name.split(".") query = f""" select decode(owner, user, null, owner) as owner, object_name || '.' || procedure_name as object_name, decode( ( select count(*) from {ddprefix}_arguments where owner = nvl(:owner, user) and lower(object_name) = lower(:object_name) and lower(package_name) = lower(:package_name) and argument_name is null ), 0, 'procedure', 'function' ) as object_type from {ddprefix}_procedures where lower(procedure_name) = lower(:object_name) and lower(owner) = lower(nvl(:owner, user)) and lower(object_name) = lower(:package_name) """ cursor.execute(query, object_name=name[1], package_name=name[0], owner=owner) else: query = f""" select object_name, decode(owner, user, null, owner) as owner, object_type from {ddprefix}_objects where lower(object_name) = lower(:object_name) and lower(owner) = lower(nvl(:owner, user)) """ cursor.execute(query, object_name=name, owner=owner) for rec in cursor: type = rec.object_type.lower() try: cls = SchemaObject.name2type[type] except KeyError: raise TypeError(f"type {type} not supported") else: yield cls(rec.object_name, rec.owner, self)
[docs] def object_named(self, name, owner=None): """ Return the object named ``name`` from the schema. If ``owner`` is :const:`None` the current schema is queried, else the specified one is used. ``name`` and ``owner`` are treated case insensitively. If there are multiple objects with the same name, which one gets returned is undefined. If there is no such object an :exc:`SQLNoSuchObjectError` exception will be raised. """ result = misc.first(self.objects_named(name, owner)) if result is None: raise SQLNoSuchObjectError(name, owner) return result
# For backwards compatibility def getobject(self, name, owner=None): return object_named(self, name, owner=None)
[docs]def connect(*args, **kwargs): """ Create a connection to the database and return a :class:`Connection` object. """ return Connection(*args, **kwargs)
[docs]class Cursor(Cursor): """ A subclass of the cursor class in :mod:`cx_Oracle`. The "fetch" methods will return records as :class:`Record` objects and ``LOB`` values will be returned as :class:`LOBStream` objects or :class:`str`/:class:`bytes` objects (depending on the cursors :attr:`readlobs` attribute). """
[docs] def __init__(self, connection, readlobs=None): """ Return a new cursor for the connection ``connection``. For the meaning of ``readlobs`` see :meth:`Connection.__init__`. """ super().__init__(connection) self.readlobs = (readlobs if readlobs is not None else connection.readlobs)
[docs] def ddprefix(self): """ Return whether the user has access to the ``DBA_*`` views (``"dba"``) or not (``"all"``). """ if self.connection._ddprefix is None: try: self.execute("select /*+FIRST_ROWS(1)*/ table_name from dba_tables") except DatabaseError as exc: if exc.args[0].code == 942: # ORA-00942: table or view does not exist self.connection._ddprefix = "all" else: raise else: self.connection._ddprefix = "dba" return self.connection._ddprefix
[docs] def ddprefixargs(self): """ Return whether the user has access to the ``DBA_ARGUMENTS`` view (``"dba"``) or not (``"all"``). """ # This method is separate from :meth:`ddprefix`, because Oracle 10 doesn't # have a ``DBA_ARGUMENTS`` view. if self.connection._ddprefixargs is None: try: self.execute("select /*+FIRST_ROWS(1)*/ object_name from dba_arguments") except DatabaseError as exc: if exc.args[0].code == 942: # ORA-00942: table or view does not exist self.connection._ddprefixargs = "all" else: raise else: self.connection._ddprefixargs = "dba" return self.connection._ddprefixargs
def execute(self, statement, parameters=None, **kwargs): if parameters is not None: result = super().execute(statement, parameters, **kwargs) else: result = super().execute(statement, **kwargs) if self.description is not None: self.rowfactory = RecordMaker(self) return result def executemany(self, statement, parameters): result = super().executemany(statement, parameters) if self.description is not None: self.rowfactory = RecordMaker(self) return result def __repr__(self): return f"<{self.__class__.__module__}.{self.__class__.__qualname__} statement={self.statement!r} at {id(self):#x}>"
### ### Classes used for database meta data ###
[docs]class MixinNormalDates: """ Mixin class that provides methods for determining creation and modification dates for objects. """ def cdate(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select sys_extract_utc(from_tz(cast(created as timestamp), dbtimezone)) from {ddprefix}_objects where lower(object_type) = :type and object_name = :name and owner = nvl(:owner, user) """ cursor.execute(query, type=self.__class__.type, name=self.name, owner=self.owner) row = cursor.fetchone() if row is None: raise SQLObjectNotFoundError(self) return row[0] def udate(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select sys_extract_utc(from_tz(cast(last_ddl_time as timestamp), dbtimezone)) from {ddprefix}_objects where lower(object_type) = :type and object_name = :name and owner = nvl(:owner, user) """ cursor.execute(query, type=self.__class__.type, name=self.name, owner=self.owner) row = cursor.fetchone() if row is None: raise SQLObjectNotFoundError(self) return row[0]
[docs]class MixinCodeSQL: """ Mixin class that provides methods returning the create and drop statements for various objects. """ def exists(self, connection=None, term=True): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select 1 from {ddprefix}_source where type = :type and owner = nvl(:owner, user) and name = :name """ cursor.execute(query, type=self.__class__.type.upper(), owner=self.owner, name=self.name) rec = cursor.fetchone() return rec is not None def createsql(self, connection=None, term=True): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select text from {ddprefix}_source where type = :type and owner = nvl(:owner, user) and name = :name order by line """ cursor.execute(query, type=self.__class__.type.upper(), owner=self.owner, name=self.name) code = "\n".join((rec.text or "").rstrip() for rec in cursor) # sqlplus strips trailing spaces when executing SQL scripts, so we do that too if not code: return "" code = " ".join(code.split(None, 1)) # compress "PROCEDURE FOO" code = code.strip() type = self.__class__.type code = code[code.lower().find(type)+len(type):].strip() # drop "procedure" etc. # drop our own name (for triggers this includes the schema name) if code.startswith('"'): code = code[code.find('"', 1)+1:] else: while code and (code[0].isalnum() or code[0] in "_$."): code = code[1:] while code.startswith(" "): code = code[1:] code = f"create or replace {type} {self.getfullname()}{code}\n" if term: code += "\n/\n" else: code += "\n" return code def dropsql(self, connection=None, term=True): if self.owner is not None: name = f"{self.owner}.{self.name}" else: name = self.name code = f"drop {self.__class__.type} {name}" if term: code += ";\n" else: code += "\n" return code def fixname(self, code): if code: code = code.split(None, 5) code = f"create or replace {code[3]} {self.getfullname()}\n{code[5]}" return code
def getfullname(name, owner): parts = [] if owner is not None: if owner != owner.upper() or not all(c.isalnum() or c == "_" for c in owner): part = f'"{owner}"' parts.append(owner) for part in name.split("."): if part != part.upper() or not all(c.isalnum() or c == "_" for c in part): part = f'"{part}"' parts.append(part) return ".".join(parts) class _SchemaObject_meta(type): def __new__(mcl, name, bases, dict): typename = None if "type" in dict and name not in {"SchemaObject", "OwnedSchemaObject"}: typename = dict["type"] cls = type.__new__(mcl, name, bases, dict) if typename is not None: SchemaObject.name2type[typename] = cls return cls
[docs]class SchemaObject(object, metaclass=_SchemaObject_meta): """ The base class for all Python classes modelling schema objects in the database. """ name2type = {} # maps the Oracle type name to the Python class (populated by the metaclass) def __init__(self, name, connection=None): self.name = name self.connection = connection def __repr__(self): if self.connection is not None: return f"<{self.__class__.__module__}.{self.__class__.__qualname__} name={self.name!r} connection={self.connectstring!r} at {id(self):#x}>" else: return f"<{self.__class__.__module__}.{self.__class__.__qualname__} name={self.name!r} at {id(self):#x}>" def __str__(self): return f"{self.type} {self.name}" def __eq__(self, other): return self.__class__ is other.__class__ and self.name == other.name def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash(self.__class__.__name__) ^ hash(self.name) def getfullname(self): return getfullname(self.name, None)
[docs] @misc.notimplemented def createsql(self, connection=None, term=True): """ Return SQL code to create this object. """
[docs] @misc.notimplemented def dropsql(self, connection=None, term=True): """ Return SQL code to drop this object """
[docs] @misc.notimplemented def fixname(self, code): """ Replace the name of the object in the SQL code ``code`` with the name of ``self``. """
[docs] @misc.notimplemented def exists(self, connection=None): """ Return whether the object ``self`` really exists in the database specified by ``connection``. """
[docs] def cdate(self, connection=None): """ Return a :class:`datetime.datetime` object with the creation date of ``self`` in the database specified by ``connection`` (or :const:`None` if that information is not available). """ return None
[docs] def udate(self, connection=None): """ Return a :class:`datetime.datetime` object with the last modification date of ``self`` in the database specified by ``connection`` (or :const:`None` if that information is not available). """ return None
[docs] def references(self, connection=None, done=None): """ Objects directly used by ``self``. If ``connection`` is not :const:`None` it will be used as the database connection from which to fetch data. If ``connection`` is :const:`None` the connection from which ``self`` has been extracted will be used. If there is not such connection, you'll get an exception. """ yield from ()
[docs] def referencesall(self, connection=None, done=None): """ All objects used by ``self`` (recursively). For the meaning of ``connection`` see :meth:`references`. ``done`` is used internally and shouldn't be passed. """ if done is None: done = set() if self not in done: done.add(self) for obj in self.references(connection): yield from obj.referencesall(connection, done) yield self
[docs] def referencedby(self, connection=None): """ Objects using ``self``. For the meaning of ``connection`` see :meth:`references`. """ yield from ()
[docs] def referencedbyall(self, connection=None, done=None): """ All objects depending on ``self`` (recursively). For the meaning of ``connection`` see :meth:`references`. ``done`` is used internally and shouldn't be passed. """ if done is None: done = set() if self not in done: done.add(self) for obj in self.referencedby(connection): yield from obj.referencedbyall(connection, done) yield self
def getconnection(self, connection): if connection is None: connection = self.connection if connection is None: raise TypeError("no connection available") return connection def getcursor(self, connection): connection = self.getconnection(connection) return (connection, connection.cursor()) def getconnectstring(self): if self.connection: return self.connection.connectstring() return None connectstring = property(getconnectstring)
[docs]class OwnedSchemaObject(SchemaObject): """ The base class for all Python classes modelling schema objects in the database. """ name2type = {} # maps the Oracle type name to the Python class (populated by the metaclass) def __init__(self, name, owner=None, connection=None): self.name = name self.owner = owner self.connection = connection def __repr__(self): if self.owner is not None: if self.connection is not None: return f"<{self.__class__.__module__}.{self.__class__.__qualname__} name={self.name!r} owner={self.owner!r} connection={self.connectstring!r} at {id(self):#x}>" else: return f"<{self.__class__.__module__}.{self.__class__.__qualname__} name={self.name!r} owner={self.owner!r} at {id(self):#x}>" else: return super().__repr__() def __str__(self): if self.owner is not None: return f"{self.type} {self.name} @ {self.owner}" else: return super().__str__() def __eq__(self, other): return self.__class__ is other.__class__ and self.name == other.name and self.owner == other.owner def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash(self.__class__.__name__) ^ hash(self.name) ^ hash(self.owner) def getfullname(self): return getfullname(self.name, self.owner) def references(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select referenced_type, decode(referenced_owner, user, null, referenced_owner) as referenced_owner, referenced_name from {ddprefix}_dependencies where type=upper(:type) and name=:name and owner=nvl(:owner, user) and type != 'NON-EXISTENT' order by referenced_owner, referenced_name """ cursor.execute(query, type=self.type, name=self.name, owner=self.owner) for rec in cursor.fetchall(): try: cls = SchemaObject.name2type[rec.referenced_type.lower()] except KeyError: pass # FIXME: Issue a warning? else: yield cls(rec.referenced_name, rec.referenced_owner, connection) def referencedby(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select type, decode(owner, user, null, owner) as owner, name from {ddprefix}_dependencies where referenced_type = :type and referenced_name = :name and referenced_owner = nvl(:owner, user) and type != 'NON-EXISTENT' order by owner, name """ cursor.execute(query, type=self.type.upper(), name=self.name, owner=self.owner) for rec in cursor.fetchall(): try: type = SchemaObject.name2type[rec.type.lower()] except KeyError: pass # FIXME: Issue a warning? else: yield type(rec.name, rec.owner, connection)
[docs] @classmethod def names(cls, connection, owner=None): """ Generator that yields the names of all objects of this type. The argument ``owner`` specifies whose objects are yielded. For more information see :func:`owned`. Names will be in ascending order. """ cursor = connection.cursor() ddprefix = cursor.ddprefix() if owner is None: query = """ select null as owner, object_name from user_objects where object_type = :type and object_name not like 'BIN$%' and object_name not like 'DR$%' order by object_name """ cursor.execute(query, type=cls.type.upper()) elif owner is ALL: query = f""" select decode(owner, user, null, owner) as owner, object_name from {ddprefix}_objects where object_type = :type and object_name not like 'BIN$%' and object_name not like 'DR$%' order by owner, object_name """ cursor.execute(query, type=cls.type.upper()) elif isinstance(owner, str): query = f""" select decode(owner, user, null, owner) as owner, object_name from {ddprefix}_objects where object_type = :type and object_name not like 'BIN$%' and object_name not like 'DR$%' and owner = :owner order by owner, object_name """ cursor.execute(query, type=cls.type.upper(), owner=owner) else: query = f""" select decode(owner, user, null, owner) as owner, object_name from {ddprefix}_objects where object_type = :type and object_name not like 'BIN$%' and object_name not like 'DR$%' and owner in ({', '.join(sqlstr(o) for o in owner)}) order by owner, object_name """ cursor.execute(query, type=cls.type.upper()) for row in cursor: yield (row.object_name, row.owner)
[docs] @classmethod def objects(cls, connection, owner=None): """ Generator that yields all objects of this type in the current users schema. The argument ``owner`` specifies whose objects are yielded. For more information see :func:`owned`. """ for name in cls.names(connection, owner): yield cls(name[0], name[1], connection)
[docs] def synonyms(self, connection=None): """ Generator that yields all synonyms for this object. """ (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select decode(owner, user, null, owner) as owner, synonym_name from {ddprefix}_synonyms where table_owner=nvl(:owner, user) and table_name=:name """ cursor.execute(query, owner=self.owner, name=self.name) for rec in cursor: yield Synonym(rec.synonym_name, rec.owner, connection)
[docs] def privileges(self, connection=None): """ Generator that yields all privileges on this object. """ (connection, cursor) = self.getcursor(connection) if self.owner is None: query = """ select decode(owner, user, null, owner) as owner, privilege, table_name as object, decode(grantor, user, null, grantor) as grantor, grantee from user_tab_privs where owner = user and table_name = :name order by grantor, privilege, grantee """ cursor.execute(query, name=self.name) else: ddprefix = cursor.ddprefix() # The column names in ``ALL_TAB_PRIVS`` and ``DBA_TAB_PRIVS`` are different, so we have to use two different queries if ddprefix == "all": query = """ select decode(table_schema, user, null, table_schema) as owner, privilege, table_name as object, decode(grantor, user, null, grantor) as grantor, grantee from all_tab_privs where table_schema = nvl(:owner, user) and table_name = :name order by grantor, table_name, privilege, grantee """ else: query = """ select decode(owner, user, null, owner) as owner, privilege, table_name as object, decode(grantor, user, null, grantor) as grantor, grantee from dba_tab_privs where owner = nvl(:owner, user) and table_name = :name order by grantor, table_name, privilege, grantee """ cursor.execute(query, name=self.name, owner=self.owner) for rec in cursor: yield Privilege(rec.privilege, rec.object, rec.grantor, rec.grantee, rec.owner, connection)
[docs]class Sequence(MixinNormalDates, OwnedSchemaObject): """ Models a sequence in the database. """ type = "sequence" def _createsql(self, connection, term, copyvalue): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select * from {ddprefix}_sequences where sequence_owner = nvl(:owner, user) and sequence_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) code = f"create sequence {self.getfullname()}\n" code += f"\tincrement by {rec.increment_by}\n" if copyvalue: code += f"\tstart with {rec.last_number + rec.increment_by}\n" else: code += f"\tstart with {rec.min_value}\n" code += f"\tmaxvalue {rec.max_value}\n" code += f"\tminvalue {rec.min_value}\n" cycle = "" if rec.cycle_flag == "Y" else "no" code += f"\t{cycle}cycle\n" if rec.cache_size: code += f"\tcache {rec.cache_size}\n" else: code += "\tnocache\n" order = "" if rec.order_flag == "Y" else "no" code += f"\t{order}order" if term: code += ";\n" else: code += "\n" return code def exists(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select 1 from {ddprefix}_sequences where sequence_owner = nvl(:owner, user) and sequence_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() return rec is not None def createsql(self, connection=None, term=True): return self._createsql(connection, term, False)
[docs] def createsqlcopy(self, connection=None, term=True): """ Return SQL code to create an identical copy of this sequence. """ return self._createsql(connection, term, True)
def dropsql(self, connection=None, term=True): code = f"drop sequence {self.getfullname()}" if term: code += ";\n" else: code += "\n" return code def fixname(self, code): code = code.split(None, 3) code = f"create sequence {self.getfullname()}\n{code[3]}" return code def references(self, connection=None, done=None): # Shortcut: a sequence doesn't depend on anything yield from ()
def _columntype(rec, data_precision=None, data_scale=None, char_length=None): ftype = rec.data_type.lower() if data_precision is None: data_precision = rec.data_precision if data_scale is None: data_scale = rec.data_scale if char_length is None: char_length = rec.char_length fsize = data_precision fprec = data_scale if ftype == "number" and fprec == 0 and fsize is None: ftype = "integer" elif ftype == "number" and fprec is None and fsize is None: ftype = "number" elif ftype == "number" and fprec == 0: ftype = f"number({fsize})" elif ftype == "number": ftype = f"number({fsize}, {fprec})" elif ftype == "raw": ftype = f"raw({rec.data_length})" else: if char_length != 0: fsize = char_length if fsize is not None: ftype += f"({fsize}" if rec.char_used == "B": ftype += " byte" elif rec.char_used == "C": ftype += " char" if fprec is not None: ftype += f", {fprec}" ftype += ")" return ftype def _columndefault(rec): if rec.data_default is not None and rec.data_default.strip() != "null": return rec.data_default.strip() return "null"
[docs]class Table(MixinNormalDates, OwnedSchemaObject): """ Models a table in the database. """ type = "table" def createsql(self, connection=None, term=True): (connection, cursor) = self.getcursor(connection) if self.ismview(connection): return "" ddprefix = cursor.ddprefix() # Find the fields that where used for an inline primary key constraint, # as we want to regenerate it as part of the create table statement query = f""" select column_name from {ddprefix}_constraints c, {ddprefix}_cons_columns cc where c.constraint_type = 'P' and c.generated = 'GENERATED NAME' and c.owner = nvl(:owner, user) and c.table_name = :name and c.constraint_name = cc.constraint_name """ cursor.execute(query, owner=self.owner, name=self.name) _inlinepkfields = {rec.column_name for rec in cursor} (organization, logging, compression) = self._info(connection) query = f""" select c.column_name, c.data_type, c.data_precision, c.data_scale, c.char_length, c.char_used, c.data_default, c.nullable, decode(l.compression, 'NO', null, 'NONE', null, l.compression) as compression from {ddprefix}_tab_columns c, {ddprefix}_lobs l where c.owner = nvl(:owner, user) and c.table_name = :name and c.owner = l.owner(+) and c.table_name = l.table_name(+) and c.column_name = l.column_name(+) order by column_id asc """ cursor.execute(query, owner=self.owner, name=self.name) recs = cursor.fetchall() code = [f"create table {self.getfullname()}\n(\n"] for (i, rec) in enumerate(recs): if i: code.append(",\n") code.append(f"\t{getfullname(rec.column_name, None)} {_columntype(rec)}") default = _columndefault(rec) if default != "null": code.append(f" default {default}") if rec.nullable == "N": code.append(" not null") if rec.column_name in _inlinepkfields: code.append(" primary key") code.append("\n)") if not logging: code.append(" nologging") if compression is not None: if compression == "BASIC": code.append(" compress") elif compression == "ADVANCED": code.append(" row store compress advanced") else: code.append(f"column store compress for {compression}") for rec in recs: if rec.compression: code.append(f"\nlob({rec.column_name}) store as securefile (compress {rec.compression.lower()})") if term: code.append(";") code.append("\n") return "".join(code) def exists(self, connection=None): (connection, cursor) = self.getcursor(connection) if self.ismview(connection): return False ddprefix = cursor.ddprefix() query = f""" select 1 from {ddprefix}_tables where owner = nvl(:owner, user) and table_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() return rec is not None def dropsql(self, connection=None, term=True): if self.ismview(connection): return "" code = f"drop table {self.getfullname()}" if term: code += ";\n" else: code += "\n" return code def fixname(self, code): code = code.split(None, 3) code = f"create table {self.getfullname()}\n{code[3]}" return code
[docs] def mview(self, connection=None): """ The materialized view this table belongs to (or :const:`None` if it's a real table). """ (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select mview_name from {ddprefix}_mviews where owner = nvl(:owner, user) and mview_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec is not None: rec = MaterializedView(self.name, self.owner, connection) return rec
[docs] def ismview(self, connection=None): """ Is this table a materialized view? """ return self.mview(connection) is not None
def _info(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select logging, iot_type, compress_for from {ddprefix}_tables where owner = nvl(:owner, user) and table_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) return ("heap" if rec.iot_type is None else "index", rec.logging == "YES", rec.compress_for)
[docs] def organization(self, connection=None): """ Return the organization of this table: either ``"heap"`` (for "normal" tables) or ``"index"`` (for index organized tables). """ return self._info(connection)[0]
[docs] def logging(self, connection=None): """ Return whether the table is in logging mode or not. """ return self._info(connection)[1]
[docs] def compression(self, connection=None): """ Return the compression mode of the table. (``None``, ``"BASIC"`` or ``"ADVANCED"``). """ return self._info(connection)[2]
@classmethod def names(cls, connection, owner=None): # Skip tables that are materialized views cursor = connection.cursor() ddprefix = cursor.ddprefix() if owner is None: query = """ select null as owner, table_name from user_tables where table_name not like 'BIN$%' and table_name not like 'DR$%' minus select null as owner, mview_name as table_name from user_mviews order by table_name """ cursor.execute(query) elif owner is ALL: query = f""" select decode(owner, user, null, owner) as owner, table_name from {ddprefix}_tables where table_name not like 'BIN$%' and table_name not like 'DR$%' minus select decode(owner, user, null, owner) as owner, mview_name as table_name from {ddprefix}_mviews order by owner, table_name """ cursor.execute(query) elif isinstance(owner, str): query = f""" select decode(owner, user, null, owner) as owner, table_name from {ddprefix}_tables where table_name not like 'BIN$%' and table_name not like 'DR$%' minus select decode(owner, user, null, owner) as owner, mview_name as table_name from {ddprefix}_mviews order by owner, table_name """ cursor.execute(query, owner=owner) else: query = f""" select decode(owner, user, null, owner) as owner, table_name from {ddprefix}_tables where table_name not like 'BIN$%' and table_name not like 'DR$%' and owner in ({', '.join(sqlstr(o) for o in owner)}) minus select decode(owner, user, null, owner) as owner, mview_name as table_name from {ddprefix}_mviews where owner in ({', '.join(sqlstr(o) for o in owner)}) order by owner, table_name """ cursor.execute(query) for row in cursor: yield (row.table_name, row.owner)
[docs] def columns(self, connection=None): """ Generator that yields all column objects of this table. """ (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select column_name from {ddprefix}_tab_columns where owner=nvl(:owner, user) and table_name=:name order by column_id """ cursor.execute(query, owner=self.owner, name=self.name) for rec in cursor: yield Column(f"{self.name}.{rec.column_name}", self.owner, connection)
[docs] def records(self, connection=None): """ Generator that yields all records of this table. """ (connection, cursor) = self.getcursor(connection) query = f"select * from {self.getfullname()}" cursor.execute(query) return iter(cursor)
[docs] def comment(self, connection=None): """ Return the table comment """ return TableComment(self.name, self.owner, self.getconnection(connection))
[docs] def comments(self, connection=None): """ Generator that yields all column comments of this table. """ (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select column_name from {ddprefix}_tab_columns where owner = nvl(:owner, user) and table_name = :name order by column_id """ cursor.execute(query, owner=self.owner, name=self.name) for rec in cursor: yield ColumnComment(f"{self.name}.{rec.column_name}", self.owner, connection)
def _iterconstraints(self, connection, cond): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select decode(owner, user, null, owner) as owner, constraint_type, constraint_name from {ddprefix}_constraints where generated = 'USER NAME' and constraint_type {cond} and owner = nvl(:owner, user) and table_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) types = {"P": PrimaryKey, "U": UniqueConstraint, "R": ForeignKey, "C": CheckConstraint} for rec in cursor: yield types[rec.constraint_type](rec.constraint_name, rec.owner, connection)
[docs] def constraints(self, connection=None): """ Generator that yields all constraints for this table. """ return self._iterconstraints(connection, "in ('P', 'U', 'R', 'C')")
[docs] def pk(self, connection=None): """ Return the primary key constraint for this table (or :const:`None` if the table has no primary key constraint). """ return misc.first(self._iterconstraints(connection, "= 'P'"), None)
[docs] def fks(self, connection=None): """ Return the foreign key constraints for this table. """ return self._iterconstraints(connection, "= 'R'")
[docs] def uniques(self, connection=None): """ Return the unique constraints for this table. """ return self._iterconstraints(connection, "= 'U'")
[docs] def checks(self, connection=None): """ Return the unique constraints for this table. """ return self._iterconstraints(connection, "= 'C'")
def references(self, connection=None): connection = self.getconnection(connection) # A table doesn't depend on anything ... mview = self.mview(connection) if mview is not None: # ... unless it was created by a materialized view, in which case it depends on the view yield mview def referencedby(self, connection=None): if not self.ismview(connection): yield self.comment(connection) yield from self.comments(connection) yield from self.constraints(connection) for obj in super().referencedby(connection): # skip the materialized view if not isinstance(obj, MaterializedView) or obj.name != self.name or obj.owner != self.owner: yield obj
[docs]class TableComment(OwnedSchemaObject): """ Models a table comment in the database. """ type = "tablecomment" def exists(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select comments from {ddprefix}_tab_comments where owner=nvl(:owner, user) and table_name=:name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() return rec is not None
[docs] def comment(self, connection=None): """ Return the comment text for this table. """ (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select comments from {ddprefix}_tab_comments where owner = nvl(:owner, user) and table_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) return rec.comments
def createsql(self, connection=None, term=True): comment = (self.comment(connection) or "").replace("'", "''") name = self.getfullname() code = f"comment on table {name} is '{comment}'" if term: code += ";\n" else: code += "\n" return code def dropsql(self, connection=None, term=True): # will be dropped with the table return "" def fixname(self, code): code = code.split(None, 5) code = f"comment on table {self.getfullname()} is {code[5]}" return code def references(self, connection=None): connection = self.getconnection(connection) yield Table(self.name, self.owner, connection)
[docs]class ColumnComment(OwnedSchemaObject): """ Models a column comment in the database. """ type = "columncomment" def exists(self, connection=None): (connection, cursor) = self.getcursor(connection) tcname = self.name.split(".") ddprefix = cursor.ddprefix() query = f""" select comments from {ddprefix}_col_comments where owner=nvl(:owner, user) and table_name=:tname and column_name=:cname """ cursor.execute(query, owner=self.owner, tname=tcname[0], cname=tcname[1]) rec = cursor.fetchone() return rec is not None
[docs] def comment(self, connection=None): """ Return the comment text for this column. """ (connection, cursor) = self.getcursor(connection) tcname = self.name.split(".") ddprefix = cursor.ddprefix() query = f""" select comments from {ddprefix}_col_comments where owner = nvl(:owner, user) and table_name = :tname and column_name = :cname """ cursor.execute(query, owner=self.owner, tname=tcname[0], cname=tcname[1]) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) return rec.comments
def createsql(self, connection=None, term=True): comment = (self.comment(connection) or "").replace("'", "''") name = self.getfullname() code = f"comment on column {name} is '{comment}'" if term: code += ";\n" else: code += "\n" return code def dropsql(self, connection=None, term=True): # will be dropped with the table return "" def fixname(self, code): code = code.split(None, 5) code = f"comment on column {self.getfullname()} is {code[5]}" return code def references(self, connection=None): connection = self.getconnection(connection) yield Table(self.name.split(".")[0], self.owner, connection)
[docs]class Constraint(OwnedSchemaObject): """ Base class of all constraints (primary key constraints, foreign key constraints, unique constraints and check constraints). """ def exists(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select 1 from {ddprefix}_constraints where constraint_type = :type and constraint_name = :name and owner = nvl(:owner, user) """ cursor.execute(query, type=self.constraint_type, name=self.name, owner=self.owner) rec = cursor.fetchone() return rec is not None def cdate(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select sys_extract_utc(from_tz(cast(last_change as timestamp), dbtimezone)) from {ddprefix}_constraints where constraint_type = :type and constraint_name = :name and owner = nvl(:owner, user) """ cursor.execute(query, type=self.constraint_type, name=self.name, owner=self.owner) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) return None # we can't give a create date, only a change date, so return :const:`None` here def udate(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select sys_extract_utc(from_tz(cast(last_change as timestamp), dbtimezone)) from {ddprefix}_constraints where constraint_type=:type and constraint_name=:name and owner=nvl(:owner, user) """ cursor.execute(query, type=self.constraint_type, name=self.name, owner=self.owner) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) return rec[0] def _sql(self, connection, term, command): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select table_name from {ddprefix}_constraints where constraint_type = :type and owner = nvl(:owner, user) and constraint_name = :name """ cursor.execute(query, type=self.constraint_type, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) tablename = getfullname(rec.table_name, self.owner) checkname = getfullname(self.name, None) code = f"alter table {tablename} {command} constraint {checkname}" if term: code += ";\n" else: code += "\n" return code def dropsql(self, connection=None, term=True): return self._sql(connection, term, "drop") def enablesql(self, connection=None, term=True): return self._sql(connection, term, "enable") def disablesql(self, connection=None, term=True): return self._sql(connection, term, "disable")
[docs] def isenabled(self, connection=None): """ Return whether this constraint is enabled. """ (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select status from {ddprefix}_constraints where constraint_type = :type and owner = nvl(:owner, user) and constraint_name = :name """ cursor.execute(query, type=self.constraint_type, owner=self.owner, name=self.name) rec = cursor.fetchone() return rec[0] == "ENABLED"
@classmethod def names(cls, connection, owner=None): cursor = connection.cursor() ddprefix = cursor.ddprefix() if owner is None: query = """ select null as owner, constraint_name from user_constraints where generated = 'USER NAME' and constraint_type = :type and constraint_name not like 'BIN$%' order by constraint_name """ cursor.execute(query, type=cls.constraint_type) elif owner is ALL: query = f""" select decode(owner, user, null, owner) as owner, constraint_name from {ddprefix}_constraints where generated = 'USER NAME' and constraint_type = :type and constraint_name not like 'BIN$%' order by owner, constraint_name """ cursor.execute(query, type=cls.constraint_type) elif isinstance(owner, str): query = f""" select decode(owner, user, null, owner) as owner, constraint_name from {ddprefix}_constraints where generated = 'USER NAME' and constraint_type = :type and constraint_name not like 'BIN$%' and owner = :owner order by owner, constraint_name """ cursor.execute(query, type=cls.constraint_type, owner=owner) else: query = f""" select decode(owner, user, null, owner) as owner, constraint_name from {ddprefix}_constraints where generated = 'USER NAME' and constraint_type = :type and constraint_name not like 'BIN$%' and owner in ({', '.join(sqlstr(o) for o in owner)}) order by owner, constraint_name """ cursor.execute(query, type=cls.constraint_type) for rec in cursor: yield (rec.constraint_name, rec.owner) def fixname(self, code): code = code.split(None, 6) code = f"alter table {code[2]} add constraint {self.getfullname()} {code[6]}" return code
[docs] def table(self, connection=None): """ Return the :class:`Table` ``self`` belongs to. """ (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select table_name from {ddprefix}_constraints where constraint_type=:type and owner=nvl(:owner, user) and constraint_name=:name """ cursor.execute(query, type=self.constraint_type, owner=self.owner, name=self.name) rec = cursor.fetchone() return Table(rec.table_name, self.owner, connection)
[docs]class PrimaryKey(Constraint): """ Models a primary key constraint in the database. """ type = "pk" constraint_type = "P"
[docs] def columns(self, connection=None): """ Return an iterator over the columns this primary key consists of. """ (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select decode(owner, user, null, owner) as owner, constraint_name, table_name, r_owner, r_constraint_name from {ddprefix}_constraints where constraint_type = 'P' and owner = nvl(:owner, user) and constraint_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) rec2 = cursor.fetchone() if rec2 is None: raise SQLObjectNotFoundError(self) tablename = getfullname(rec2.table_name, rec2.owner) query = f""" select column_name from {ddprefix}_cons_columns where owner=nvl(:owner, user) and constraint_name=:name order by position """ cursor.execute(query, owner=self.owner, name=self.name) for rec in cursor: yield Column(f"{tablename}.{rec.column_name}", self.owner, connection)
def createsql(self, connection=None, term=True): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select decode(owner, user, null, owner) as owner, constraint_name, table_name, r_owner, r_constraint_name from {ddprefix}_constraints where constraint_type = 'P' and owner = nvl(:owner, user) and constraint_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) rec2 = cursor.fetchone() if rec2 is None: raise SQLObjectNotFoundError(self) query = f""" select column_name from {ddprefix}_cons_columns where owner=nvl(:owner, user) and constraint_name=:name order by position """ cursor.execute(query, owner=self.owner, name=self.name) tablename = getfullname(rec2.table_name, rec2.owner) pkname = getfullname(self.name, None) fields = ", ".join(r.column_name for r in cursor) code = f"alter table {tablename} add constraint {pkname} primary key({fields})" if term: code += ";\n" else: code += "\n" return code def referencedby(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select decode(owner, user, null, owner) as owner, constraint_name from {ddprefix}_constraints where constraint_type='R' and r_owner=nvl(:owner, user) and r_constraint_name=:name """ cursor.execute(query, owner=self.owner, name=self.name) for rec in cursor.fetchall(): yield ForeignKey(rec.constraint_name, rec.owner, connection) # Normally there is an index for this primary key, but we ignore it, as for the purpose of :mod:`orasql` this index doesn't exist def references(self, connection=None): yield self.table(connection)
[docs]class ForeignKey(Constraint): """ Models a foreign key constraint in the database. """ type = "fk" constraint_type = "R" def createsql(self, connection=None, term=True): (connection, cursor) = self.getcursor(connection) # Add constraint_type to the query, so we don't pick up another constraint by accident ddprefix = cursor.ddprefix() query = f""" select decode(r_owner, user, null, r_owner) as r_owner, r_constraint_name, table_name from {ddprefix}_constraints where constraint_type = 'R' and owner = nvl(:owner, user) and constraint_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) query = f""" select column_name from {ddprefix}_cons_columns where owner=nvl(:owner, user) and constraint_name=:name order by position """ cursor.execute(query, owner=self.owner, name=self.name) fields1 = ", ".join(r.column_name for r in cursor) query = f""" select table_name, column_name from {ddprefix}_cons_columns where owner=nvl(:owner, user) and constraint_name=:name order by position """ cursor.execute(query, owner=rec.r_owner, name=rec.r_constraint_name) fields2 = ", ".join(f"{getfullname(r.table_name, rec.r_owner)}({r.column_name})" for r in cursor) tablename = getfullname(rec.table_name, self.owner) fkname = getfullname(self.name, None) code = f"alter table {tablename} add constraint {fkname} foreign key ({fields1}) references {fields2}" if term: code += ";\n" else: code += "\n" return code def referencedby(self, connection=None): # Shortcut: Nobody references a foreign key yield from () def references(self, connection=None): yield self.table(connection) yield self.refconstraint(connection)
[docs] def refconstraint(self, connection=None): """ Return the constraint referenced by ``self``. In most cases this is a :class:`PrimaryKey`, but it also might be a :class:`UniqueConstraint`. """ (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select c2.constraint_type, decode(c1.r_owner, user, null, c1.r_owner) as r_owner, c1.r_constraint_name from {ddprefix}_constraints c1, {ddprefix}_constraints c2 where c1.constraint_type = 'R' and c1.owner = nvl(:owner, user) and c1.constraint_name = :name and c1.r_owner = c2.owner and c1.r_constraint_name = c2.constraint_name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() types = {"P": PrimaryKey, "U": UniqueConstraint} return types[rec.constraint_type](rec.r_constraint_name, rec.r_owner, connection)
[docs] def columns(self, connection=None): """ Return an iterator over the columns this foreign key consists of. """ (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select decode(owner, user, null, owner) as owner, table_name, column_name from {ddprefix}_cons_columns where constraint_name=:name and owner=nvl(:owner, user) order by position """ cursor.execute(query, owner=self.owner, name=self.name) for r in cursor: yield Column(f"{r.table_name}.{r.column_name}", r.owner, connection)
[docs]class UniqueConstraint(Constraint): """ Models a unique constraint in the database. """ type = "unique" constraint_type = "U" def createsql(self, connection=None, term=True): (connection, cursor) = self.getcursor(connection) # Add constraint_type to the query, so we don't pick up another constraint by accident ddprefix = cursor.ddprefix() query = f""" select table_name from {ddprefix}_constraints where constraint_type='U' and owner=nvl(:owner, user) and constraint_name=:name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) tablename = getfullname(rec.table_name, self.owner) uniquename = getfullname(self.name, None) query = """ select column_name from all_cons_columns where owner = nvl(:owner, user) and constraint_name = :name order by position """ cursor.execute(query, owner=self.owner, name=self.name) fields = ", ".join(r.column_name for r in cursor) code = f"alter table {tablename} add constraint {uniquename} unique({fields})" if term: code += ";\n" else: code += "\n" return code def referencedby(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select decode(owner, user, null, owner) as owner, constraint_name from {ddprefix}_constraints where constraint_type = 'R' and r_owner = nvl(:owner, user) and r_constraint_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) for rec in cursor.fetchall(): yield ForeignKey(rec.constraint_name, rec.owner, connection) # Normally there is an index for this constraint, but we ignore it, as for the purpose of :mod:`orasql` this index doesn't exist def references(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select decode(owner, user, null, owner) as owner, table_name from {ddprefix}_constraints where constraint_type = 'U' and owner = nvl(:owner, user) and constraint_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) for rec in cursor.fetchall(): yield Table(rec.table_name, rec.owner, connection)
[docs]class CheckConstraint(Constraint): """ Models a check constraint in the database. """ type = "check" constraint_type = "C" def createsql(self, connection=None, term=True): (connection, cursor) = self.getcursor(connection) # Add constraint_type to the query, so we don't pick up another constraint by accident ddprefix = cursor.ddprefix() query = f""" select table_name, search_condition from {ddprefix}_constraints where constraint_type = 'C' and owner = nvl(:owner, user) and constraint_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) tablename = getfullname(rec.table_name, self.owner) checkname = getfullname(self.name, None) code = f"alter table {tablename} add constraint {checkname} check ({rec.search_condition})" if term: code += ";\n" else: code += "\n" return code def referencedby(self, connection=None): # Shortcut: Nobody references a check constraint yield from () def references(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select decode(owner, user, null, owner) as owner, table_name from {ddprefix}_constraints where constraint_type = 'C' and owner = nvl(:owner, user) and constraint_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) for rec in cursor.fetchall(): yield Table(rec.table_name, rec.owner, connection)
[docs]class Index(MixinNormalDates, OwnedSchemaObject): """ Models an index in the database. """ type = "index" def exists(self, connection=None): (connection, cursor) = self.getcursor(connection) if self.isconstraint(connection): return False ddprefix = cursor.ddprefix() query = f""" select 1 from {ddprefix}_indexes where owner = nvl(:owner, user) and index_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() return rec is not None def createsql(self, connection=None, term=True): (connection, cursor) = self.getcursor(connection) if self.isconstraint(connection): return "" ddprefix = cursor.ddprefix() query = f""" select index_name, table_name, uniqueness, index_type, logging, ityp_owner, ityp_name, parameters from {ddprefix}_indexes where owner = nvl(:owner, user) and index_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) tablename = getfullname(rec.table_name, self.owner) indexname = self.getfullname() unique = " unique" if rec.uniqueness == "UNIQUE" else "" logging = " nologging" if rec.logging=="NO" else "" query = f""" select aie.column_expression, aic.column_name from {ddprefix}_ind_columns aic, {ddprefix}_ind_expressions aie where aic.index_owner = aie.index_owner(+) and aic.index_name = aie.index_name(+) and aic.column_position = aie.column_position(+) and aic.index_owner = nvl(:owner, user) and aic.index_name = :name order by aic.column_position """ cursor.execute(query, owner=self.owner, name=self.name) fields = ", ".join(r.column_expression or r.column_name for r in cursor) code = f"create{unique} index {indexname} on {tablename} ({fields}){logging}" if rec.index_type == "DOMAIN": if rec.parameters: parameters = f" parameters ({sqlliteral(rec.parameters)})" else: parameters = "" code += f" indextype is {rec.ityp_owner}.{rec.ityp_name}{parameters}" if term: code += ";\n" else: code += "\n" return code def dropsql(self, connection=None, term=True): if self.isconstraint(connection): return "" code = f"drop index {self.getfullname()}" if term: code += ";\n" else: code += "\n" return code
[docs] def rebuildsql(self, connection=None, term=True): """ Return SQL code to rebuild this index. """ if self.isconstraint(connection): return "" code = f"alter index {self.getfullname()} rebuild" if term: code += ";\n" else: code += "\n" return code
@classmethod def names(cls, connection, owner=None): # We skip those indexes that are generated by a constraint cursor = connection.cursor() ddprefix = cursor.ddprefix() if owner is None: query = """ select null as owner, index_name from ( select index_name from user_indexes where index_type not in ('LOB', 'IOT - TOP') minus select index_name from user_constraints where constraint_type in ('U', 'P') and owner = user ) where index_name not like 'BIN$%' order by index_name """ cursor.execute(query) elif owner is ALL: query = f""" select decode(owner, user, null, owner) as owner, index_name from ( select owner, index_name from {ddprefix}_indexes where index_type not in ('LOB', 'IOT - TOP') minus select index_owner, index_name from {ddprefix}_constraints where constraint_type in ('U', 'P') ) where index_name not like 'BIN$%' order by owner, index_name """ cursor.execute(query) elif isinstance(owner, str): query = f""" select decode(owner, user, null, owner) as owner, index_name from ( select owner, index_name from {ddprefix}_indexes where index_type not in ('LOB', 'IOT - TOP') and owner = :owner minus select index_owner, index_name from {ddprefix}_constraints where constraint_type in ('U', 'P') and index_owner = :owner ) where index_name not like 'BIN$%' order by owner, index_name """ cursor.execute(query, owner=owner) else: query = f""" select decode(owner, user, null, owner) as owner, index_name from ( select owner, index_name from {ddprefix}_indexes where index_type not in ('LOB', 'IOT - TOP') and owner in ({', '.join(sqlstr(o) for o in owner)}) minus select index_owner, index_name from {ddprefix}_constraints where constraint_type in ('U', 'P') and index_owner in ({', '.join(sqlstr(o) for o in owner)}) ) where index_name not like 'BIN$%' order by owner, index_name """ cursor.execute(query) for row in cursor: yield (row.index_name, row.owner) def fixname(self, code): if code.lower().startswith("create unique"): code = code.split(None, 5) code = f"create unique index {self.getfullname()} {code[5]}" else: code = code.split(None, 4) code = f"create index {self.getfullname()} {code[4]}" return code
[docs] def constraint(self, connection=None): """ If this index is generated by a constraint, return the constraint otherwise return :const:`None`. """ (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select constraint_type from {ddprefix}_constraints where owner = nvl(:owner, user) and constraint_name = :name and constraint_type in ('U', 'P') """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec is not None: rec = {"U": UniqueConstraint, "P": PrimaryKey}[rec.constraint_type](self.name, self.owner, connection) return rec
[docs] def isconstraint(self, connection=None): """ Is this index generated by a constraint? """ return self.constraint(connection) is not None
def references(self, connection=None): constraint = self.constraint(connection) # if self is generated by a constraint (i.e. ``constraint`` is not :const:`None`), we ignore all dependencies (such an index is never produced be :meth:`objects`) if constraint is None: (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() # If this is a domain index, reference the preferences defined there query = f""" select index_type, parameters from {ddprefix}_indexes where owner = nvl(:owner, user) and index_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec.index_type == "DOMAIN": parameters = re.split('\\b(datastore|memory|lexer|stoplist|wordlist)\\b', rec.parameters, flags=re.IGNORECASE) foundparameter = None for parameter in parameters: if foundparameter: if foundparameter.lower() in ("datastore", "lexer", "stoplist", "wordlist"): (prefowner, sep, prefname) = parameter.strip().partition(".") if sep: yield Preference(prefname.upper(), prefowner) else: yield Preference(prefowner.upper()) foundparameter = None elif parameter.lower() in ("datastore", "lexer", "stoplist", "wordlist"): foundparameter = parameter yield from super().references(connection)
[docs] def table(self, connection=None): """ Return the :class:`Table` ``self`` belongs to. """ (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select table_name, decode(table_owner, user, null, table_owner) as table_owner from {ddprefix}_indexes where owner=nvl(:owner, user) and index_name=:name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() return Table(rec.table_name, rec.table_owner, connection)
[docs] def columns(self, connection=None): """ Return an iterator over the columns this index consists of. """ (connection, cursor) = self.getcursor(connection) table = self.table(connection) ddprefix = cursor.ddprefix() query = f""" select aie.column_expression, aic.column_name from {ddprefix}_ind_columns aic, {ddprefix}_ind_expressions aie where aic.index_owner = aie.index_owner(+) and aic.index_name = aie.index_name(+) and aic.column_position = aie.column_position(+) and aic.index_owner = nvl(:owner, user) and aic.index_name = :name order by aic.column_position """ cursor.execute(query, owner=self.owner, name=self.name) for rec in cursor: if rec.column_expression is not None: raise TypeError(f"{self!r} contains an index expression") yield Column(f"{table.name}.{rec.column_name}", owner=table.owner)
[docs]class Synonym(OwnedSchemaObject): """ Models a synonym in the database. """ type = "synonym" def exists(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select 1 from {ddprefix}_synonyms where owner=nvl(:owner, user) and synonym_name=:name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() return rec is not None def createsql(self, connection=None, term=True): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select decode(table_owner, user, null, table_owner) as table_owner, table_name, db_link from {ddprefix}_synonyms where owner = nvl(:owner, user) and synonym_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) owner = self.owner if owner == "PUBLIC": public = "public " owner = None else: public = "" name = getfullname(self.name, owner) name2 = getfullname(rec.table_name, rec.table_owner) code = f"create or replace {public}synonym {name} for {name2}" if rec.db_link is not None: code += f"@{rec.db_link}" if term: code += ";\n" else: code += "\n" return code def dropsql(self, connection=None, term=True): owner = self.owner if owner == "PUBLIC": public = "public " owner = None else: public = "" name = getfullname(self.name, owner) code = f"drop {public}synonym {name}" if term: code += ";\n" else: code += "\n" return code def fixname(self, code): if code.lower().startswith("create or replace public"): code = code.split(None, 6) code = f"create or replace public synonym {self.getfullname()} {code[6]}" else: code = code.split(None, 5) code = f"create or replace synonym {self.getfullname()} {code[5]}" return code
[docs] def object(self, connection=None): """ Return the object for which ``self`` is a synonym. """ (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select table_owner, table_name, db_link from {ddprefix}_synonyms where owner=nvl(:owner, user) and synonym_name=:name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) return connection._getobject(rec.table_name, rec.table_owner)
[docs] @classmethod def names(cls, connection, owner=None, object_owner=ALL): """ Generator that yields the names of all synonyms. For the meaning of ``owner`` and ``object_owner`` see :meth:`objects`. Names will be in ascending order. """ cursor = connection.cursor() ddprefix = cursor.ddprefix() where_sql = [] params = {} if owner is None: view_sql = "user_synonyms" owner_sql = "null as owner" order_sql = "table_owner, synonym_name" else: view_sql = f"{ddprefix}_synonyms" owner_sql = "decode(owner, user, null, owner) as owner" order_sql = "owner, table_owner, synonym_name" if owner is ALL: pass elif isinstance(owner, str): where_sql.append("owner = :owner") params["owner"] = owner else: where_sql.append(f"owner in ({', '.join(sqlstr(o) for o in owner)})") if object_owner is None: where_sql.append("table_owner = user") elif object_owner is ALL: pass elif isinstance(object_owner, str): where_sql.append("table_owner = :table_owner") params["table_owner"] = object_owner else: where_sql.append(f"table_owner in ({', '.join(sqlstr(o) for o in object_owner)})") query = f"select {owner_sql}, synonym_name from {view_sql}" if where_sql: query += f" where {' and '.join(where_sql)}" query += f" order by {order_sql}" cursor.execute(query, **params) for row in cursor: yield (row.synonym_name, row.owner)
[docs] @classmethod def objects(cls, connection, owner=None, object_owner=ALL): """ Generator that yields all synonym in the current users schema. The argument ``owner`` specifies to which owner the synonym must belong to to be yielded. The argument ``object_owner`` specifies to which owner the object must belong to to be yielded. For more information see :func:`owned`. """ for name in cls.names(connection, owner, object_owner): yield cls(name[0], name[1], connection)
[docs]class View(MixinNormalDates, OwnedSchemaObject): """ Models a view in the database. """ type = "view" def exists(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select 1 from {ddprefix}_views where owner=nvl(:owner, user) and view_name=:name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() return rec is not None def createsql(self, connection=None, term=True): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select text from {ddprefix}_views where owner=nvl(:owner, user) and view_name=:name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) code = "\n".join(line.rstrip() for line in (rec.text or "").strip().splitlines()) # Strip trailing whitespace code = f"create or replace force view {self.getfullname()} as\n\t{code}" if term: code += "\n/\n" else: code += "\n" return code def dropsql(self, connection=None, term=True): code = f"drop view {self.getfullname()}" if term: code += ";\n" else: code += "\n" return code def fixname(self, code): code = code.split(None, 6) code = f"create or replace force view {self.getfullname()} {code[6]}" return code def records(self, connection=None): (connection, cursor) = self.getcursor(connection) query = f"select * from {self.getfullname()}" cursor.execute(query) return iter(cursor)
[docs]class MaterializedView(View): """ Models a meterialized view in the database. """ type = "materialized view" def exists(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select 1 from {ddprefix}_mviews where owner=nvl(:owner, user) and mview_name=:name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() return rec is not None def createsql(self, connection=None, term=True): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select * from {ddprefix}_mviews where owner = nvl(:owner, user) and mview_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) code = "\n".join(line.rstrip() for line in rec.query.strip().splitlines()) # Strip trailing whitespace code = f"create materialized view {self.getfullname()}\nrefresh {rec.refresh_method} on {rec.refresh_mode} as\n\t{code}" if term: code += "\n/\n" else: code += "\n" return code def dropsql(self, connection=None, term=True): code = f"drop materialized view {self.getfullname()}" if term: code += ";\n" else: code += "\n" return code def fixname(self, code): code = code.split(None, 4) code = f"create materialized view {self.getfullname()} {code[4]}" return code def references(self, connection=None): # skip the table for obj in super().references(connection): if not isinstance(obj, Table) or obj.name != self.name or obj.owner != self.owner: yield obj def referencedby(self, connection=None): connection = self.getconnection(connection) yield Table(self.name, self.owner, connection)
[docs]class Library(OwnedSchemaObject): """ Models a library in the database. """ type = "library" def exists(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select file_spec from {ddprefix}_libraries where owner = nvl(:owner, user) and library_name = :name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() return rec is not None def createsql(self, connection=None, term=True): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select file_spec from {ddprefix}_libraries where owner=nvl(:owner, user) and library_name=:name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) code = f"create or replace library {self.getfullname()} as {rec.file_spec!r}" if term: code += ";\n" else: code += "\n" return code def dropsql(self, connection=None, term=True): code = f"drop library {self.getfullname()}" if term: code += ";\n" else: code += "\n" return code def fixname(self, code): code = code.split(None, 5) code = f"create or replace library {self.getfullname()} {code[5]}" return code
[docs]class Argument: """ :class:`Argument` objects hold information about the arguments of a stored procedure. """ def __init__(self, name, position, datatype, isin, isout): self.name = name self.position = position self.datatype = datatype self.isin = isin self.isout = isout def __repr__(self): return f"<{self.__class__.__module__}.{self.__class__.__qualname__} name={self.name!r} position={self.position!r} datatype={self.datatype!r} at {id(self):#x}>"
[docs]class Callable(MixinNormalDates, MixinCodeSQL, OwnedSchemaObject): """ Models a callable object in the database, i.e. functions and procedures. """ _ora2cx = { "date": datetime.datetime, "timestamp": datetime.datetime, "timestamp with time zone": datetime.datetime, "number": float, "varchar2": str, "clob": DB_TYPE_CLOB, "blob": DB_TYPE_BLOB, } def __init__(self, name, owner=None, connection=None): super().__init__(name, owner, connection) self._argsbypos = None self._argsbyname = None self._returnvalue = None def _calcargs(self, cursor): if self._argsbypos is None: ddprefix = cursor.ddprefix() if "." in self.name: (package_name, procedure_name) = self.name.split(".") query = f""" select object_name from {ddprefix}_procedures where owner = nvl(:owner, user) and object_name = :package_name and procedure_name = :procedure_name """ cursor.execute(query, owner=self.owner, package_name=package_name, procedure_name=procedure_name) else: package_name = None procedure_name = self.name query = f""" select object_name from {ddprefix}_procedures where owner = nvl(:owner, user) and object_name = :name and procedure_name is null """ cursor.execute(query, owner=self.owner, name=procedure_name) if cursor.fetchone() is None: raise SQLObjectNotFoundError(self) self._argsbypos = [] self._argsbyname = {} if package_name is not None: query = f""" select lower(argument_name) as name, lower(in_out) as in_out, lower(data_type) as datatype from {ddprefix}_arguments where owner = nvl(:owner, user) and package_name = :package_name and object_name = :procedure_name and data_level = 0 order by sequence """ cursor.execute(query, owner=self.owner, package_name=package_name, procedure_name=procedure_name) else: query = f""" select lower(argument_name) as name, lower(in_out) as in_out, lower(data_type) as datatype from {ddprefix}_arguments where owner = nvl(:owner, user) and package_name is null and object_name = :procedure_name and data_level = 0 order by sequence """ cursor.execute(query, owner=self.owner, procedure_name=procedure_name) i = 0 # argument position (skip return value) for record in cursor: arginfo = Argument(record.name, i, record.datatype, "in" in record.in_out, "out" in record.in_out) if record.name is None: # this is the return value self._returnvalue = arginfo else: self._argsbypos.append(arginfo) self._argsbyname[arginfo.name] = arginfo i += 1 def _getargs(self, cursor, *args, **kwargs): queryargs = {} if len(args) > len(self._argsbypos): raise TypeError(f"too many parameters for {self!r}: {len(args):,} given, {len(self._argsbypos):,} expected") # Handle positional arguments for (arg, arginfo) in zip(args, self._argsbypos): queryargs[arginfo.name] = self._wraparg(cursor, arginfo, arg) # Handle keyword arguments for (argname, arg) in kwargs.items(): argname = argname.lower() if argname in queryargs: raise TypeError(f"duplicate argument for {self!r}: {argname}") try: arginfo = self._argsbyname[argname] except KeyError: raise TypeError(f"unknown parameter for {self!r}: {argname}") queryargs[arginfo.name] = self._wraparg(cursor, arginfo, arg) # Add out parameters for anything that hasn't been specified for arginfo in self._argsbypos: if arginfo.name not in queryargs and arginfo.isout: queryargs[arginfo.name] = self._wraparg(cursor, arginfo, None) return queryargs def _wraparg(self, cursor, arginfo, arg): try: if arg is None: t = self._ora2cx[arginfo.datatype] else: t = arg.type if isinstance(arg, Object) else type(arg) except KeyError: raise TypeError(f"can't handle parameter {arginfo.name} of type {arginfo.datatype} with value {arg!r} in {self!r}") if isinstance(arg, bytes): # ``bytes`` is treated as binary data, always wrap it in a ``BLOB`` t = DB_TYPE_BLOB elif isinstance(arg, str) and len(arg) >= 2000: t = DB_TYPE_CLOB var = cursor.var(t) var.setvalue(0, arg) return var def _unwraparg(self, arginfo, cursor, value): if isinstance(value, LOB): value = _decodelob(value, cursor.readlobs) return value def _makerecord(self, cursor, args): index2name = [] values = [] for arginfo in self._argsbypos: name = arginfo.name if name in args: index2name.append(name) values.append(self._unwraparg(arginfo, cursor, args[name].getvalue(0))) name2index = dict(zip(index2name, itertools.count())) return Record(index2name, name2index, values)
[docs] def arguments(self, connection=None): """ Generator that yields all arguments of the function/procedure ``self``. """ (connection, cursor) = self.getcursor(connection) self._calcargs(cursor) yield from self._argsbypos
def synonyms(self, connection=None): if "." not in self.name: yield from super().synonyms(connection) else: yield from () def privileges(self, connection=None): if "." not in self.name: yield from super().privileges(connection) else: yield from ()
[docs]class Procedure(Callable): """ Models a procedure in the database. A :class:`Procedure` object can be used as a wrapper for calling the procedure with keyword arguments. """ type = "procedure"
[docs] def __call__(self, cursor, *args, **kwargs): """ Call the procedure with arguments ``args`` and keyword arguments ``kwargs``. ``cursor`` must be a :class:`ll.orasql.Cursor` object. This will return a :class:`Record` object containing the result of the call (i.e. this record will contain all specified and all out parameters). """ self._calcargs(cursor) if self.owner is None: name = self.name else: name = f"{self.owner}.{self.name}" queryargs = self._getargs(cursor, *args, **kwargs) queryargstring = ", ".join(f"{name}=>:{name}" for name in queryargs) query = f"begin {name}({queryargstring}); end;" cursor.execute(query, queryargs) return self._makerecord(cursor, queryargs)
[docs]class Function(Callable): """ Models a function in the database. A :class:`Function` object can be used as a wrapper for calling the function with keyword arguments. """ type = "function"
[docs] def __call__(self, cursor, *args, **kwargs): """ Call the function with arguments ``args`` and keyword arguments ``kwargs``. ``cursor`` must be an :class:`ll.orasql.Cursor` object. This will return a tuple containing the result and a :class:`Record` object containing the modified parameters (i.e. this record will contain all specified and out parameters). """ self._calcargs(cursor) if self.owner is None: name = self.name else: name = f"{self.owner}.{self.name}" queryargs = self._getargs(cursor, *args, **kwargs) returnvalue = "r" while returnvalue in queryargs: returnvalue += "_" queryargs[returnvalue] = self._wraparg(cursor, self._returnvalue, None) queryargstring = ", ".join(f"{name}=>:{name}" for name in queryargs if name != returnvalue) query = f"begin :{returnvalue} := {name}({queryargstring}); end;" cursor.execute(query, queryargs) returnvalue = self._unwraparg(self._returnvalue, cursor, queryargs.pop(returnvalue).getvalue(0)) return (returnvalue, self._makerecord(cursor, queryargs))
[docs]class Package(MixinNormalDates, MixinCodeSQL, OwnedSchemaObject): """ Models a package in the database. """ type = "package"
[docs]class PackageBody(MixinNormalDates, MixinCodeSQL, OwnedSchemaObject): """ Models a package body in the database. """ type = "package body" def synonyms(self, connection=None): yield from () def privileges(self, connection=None): yield from ()
[docs]class Type(MixinNormalDates, MixinCodeSQL, OwnedSchemaObject): """ Models a type definition in the database. """ type = "type"
[docs]class TypeBody(MixinNormalDates, MixinCodeSQL, OwnedSchemaObject): """ Models a type body in the database. """ type = "type body" def synonyms(self, connection=None): yield from () def privileges(self, connection=None): yield from ()
[docs]class Trigger(MixinNormalDates, MixinCodeSQL, OwnedSchemaObject): """ Models a trigger in the database. """ type = "trigger"
[docs]class JavaSource(MixinNormalDates, OwnedSchemaObject): """ Models Java source code in the database. """ type = "java source" def exists(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select 1 from {ddprefix}_source where type = 'JAVA SOURCE' and owner = nvl(:owner, user) and name = :name """ cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() return rec is not None def createsql(self, connection=None, term=True): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select text from {ddprefix}_source where type = 'JAVA SOURCE' and owner = nvl(:owner, user) and name = :name order by line """ cursor.execute(query, owner=self.owner, name=self.name) code = "\n".join((rec.text or "").rstrip() for rec in cursor) code = code.strip() code = f"create or replace and compile java source named {self.getfullname()} as\n{code}\n" if term: code += "/\n" return code def dropsql(self, connection=None, term=True): code = f"drop java source {self.getfullname()}" if term: code += ";\n" else: code += "\n" return code def fixname(self, code): code = code.split(None, 9) code = f"create or replace and compile java source named {self.getfullname()} {code[9]}" return code
[docs]class Privilege: """ Models a database object privilege (i.e. a grant). A :class:`Privilege` object has the following attributes: ``privilege`` : string The type of the privilege (``EXECUTE`` etc.) ``name`` : string The name of the object for which this privilege grants access ``owner`` : string or :const:`None` the owner of the object ``grantor`` : string or :const:`None` Who granted this privilege? ``grantee`` : string or :const:`None` To whom has this privilege been granted? ``connection`` : :class:`Connection` or :const:`None` The database connection """ type = "privilege" def __init__(self, privilege, name, grantor, grantee, owner=None, connection=None): self.privilege = privilege self.name = name self.grantor = grantor self.grantee = grantee self.owner = owner self.connection = connection def __repr__(self): if self.owner is not None: return f"{self.__class__.__module__}.{self.__class__.__qualname__}({self.privilege!r}, {self.name!r}, {self.grantee!r}, {self.owner!r})" else: return f"{self.__class__.__module__}.{self.__class__.__qualname__}({self.privilege!r}, {self.name!r}, {self.grantee!r})" def __str__(self): if self.owner is not None: return f"{self.privilege} privilege on {self.name} @ {self.owner} by {self.grantor} to {self.grantee}" else: return f"{self.privilege} privilege on {self.name} by {self.grantor} to {self.grantee}" def getconnection(self, connection): if connection is None: connection = self.connection if connection is None: raise TypeError("no connection available") return connection def getcursor(self, connection): connection = self.getconnection(connection) return (connection, connection.cursor()) def getconnectstring(self): if self.connection: return self.connection.connectstring() return None connectstring = property(getconnectstring)
[docs] def object(self, connection=None): """ Return the object on which ``self`` grants a privilege. """ (connection, cursor) = self.getcursor(connection) return connection._getobject(self.name, self.grantor)
[docs] @classmethod def objects(cls, connection, owner=None): """ Generator that yields object privileges. For the meaning of ``owner`` see :func:`owned`. """ cursor = connection.cursor() # can't use :meth:`getcursor` as we're in a classmethod if owner is None: query = """ select null as owner, privilege, table_name as object, decode(grantor, user, null, grantor) as grantor, grantee from user_tab_privs where owner = user order by grantor, table_name, privilege, grantee """ cursor.execute(query) elif owner is ALL: ddprefix = cursor.ddprefix() # The column names in ``ALL_TAB_PRIVS`` and ``DBA_TAB_PRIVS`` are different, so we have to use two different queries if ddprefix == "all": query = """ select decode(table_schema, user, null, table_schema) as owner, privilege, table_name as object, decode(grantor, user, null, grantor) as grantor, grantee from all_tab_privs order by grantor, table_name, privilege, grantee """ else: query = """ select decode(owner, user, null, owner) as owner, privilege, table_name as object, decode(grantor, user, null, grantor) as grantor, grantee from dba_tab_privs order by grantor, table_name, privilege, grantee """ cursor.execute(query) elif isinstance(owner, str): ddprefix = cursor.ddprefix() query = """ select decode(table_schema, user, null, table_schema) as owner, privilege, table_name as object, decode(grantor, user, null, grantor) as grantor, grantee from {ddprefix}_tab_privs where table_schema = :owner order by table_schema, table_name, privilege """ cursor.execute(query, owner=owner) else: ddprefix = cursor.ddprefix() # The column names in ``ALL_TAB_PRIVS`` and ``DBA_TAB_PRIVS`` are different, so we have to use two different queries if ddprefix == "all": query = f""" select decode(table_schema, user, null, table_schema) as owner, privilege, table_name as object, decode(grantor, user, null, grantor) as grantor, grantee from all_tab_privs where table_schema in ({', '.join(sqlstr(o) for o in owner)}) order by table_name, privilege """ else: query = f""" select decode(owner, user, null, owner) as owner, privilege, table_name as object, decode(grantor, user, null, grantor) as grantor, grantee from dba_tab_privs where owner in ({', '.join(sqlstr(o) for o in owner)}) order by table_name, privilege """ cursor.execute(query) for rec in cursor: yield Privilege(rec.privilege, rec.object, rec.grantor, rec.grantee, rec.owner, connection)
[docs] def grantsql(self, connection=None, term=True, mapgrantee=True): """ Return SQL code to grant this privilege. If ``mapgrantee`` is a list or a dictionary and ``self.grantee`` is not in this list (or dictionary) no command will be returned. If it's a dictionary and ``self.grantee`` is in it, the privilege will be granted to the user specified as the value instead of the original one. If ``mapgrantee`` is true (the default) the privilege will be granted to the original grantee. """ (connection, cursor) = self.getcursor(connection) if mapgrantee is True: grantee = self.grantee elif isinstance(mapgrantee, (set, list, tuple)): if self.grantee.lower() in (g.lower() for g in mapgrantee): grantee = self.grantee else: grantee = None else: mapgrantee = {key.lower(): value for (key, value) in mapgrantee.items()} grantee = mapgrantee.get(self.grantee.lower(), None) if grantee is None: return "" code = f"grant {self.privilege} on {self.name} to {grantee}" if term: code += ";\n" return code
[docs]class Column(OwnedSchemaObject): """ Models a single column of a table in the database. This is used to output ``ALTER TABLE`` statements for adding, dropping and modifying columns. """ type = "column" @property def tablename(self): return self.name.partition(".")[0] @property def columnname(self): return self.name.partition(".")[-1] def exists(self, connection=None): (connection, cursor) = self.getcursor(connection) name = self.name.split(".") ddprefix = cursor.ddprefix() query = f""" select 1 from {ddprefix}_tab_columns where owner = nvl(:owner, user) and table_name = :table_name and column_name = :column_name """ cursor.execute(query, owner=self.owner, table_name=name[0], column_name=name[1]) rec = cursor.fetchone() return rec is not None def _getcolumnrecord(self, cursor): name = self.name.split(".") ddprefix = cursor.ddprefix() query = f""" select * from {ddprefix}_tab_columns c, {ddprefix}_lobs l where c.owner = nvl(:owner, user) and c.table_name = :table_name and c.column_name = :column_name and c.owner = l.owner(+) and c.table_name = l.table_name(+) and c.column_name = l.column_name(+) """ cursor.execute(query, owner=self.owner, table_name=name[0], column_name=name[1]) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) return rec def addsql(self, connection=None, term=True): (connection, cursor) = self.getcursor(connection) rec = self._getcolumnrecord(cursor) name = self.name.split(".") code = [f"alter table {getfullname(name[0], self.owner)} add {getfullname(name[1], None)}"] code.append(f" {_columntype(rec)}") default = _columndefault(rec) if default != "null": code.append(f" default {default}") if rec.nullable == "N": code.append(" not null") if term: code.append(";\n") else: code.append("\n") return "".join(code) def modifysql(self, connection, cursorold, cursornew, term=True): (connection, cursor) = self.getcursor(connection) rec = self._getcolumnrecord(cursor) recold = self._getcolumnrecord(cursorold) recnew = self._getcolumnrecord(cursornew) name = self.name.split(".") code = [f"alter table {getfullname(name[0], self.owner)} modify {getfullname(name[1], None)}"] # Has the type changed? if recold.data_precision != recnew.data_precision or recold.data_length != recnew.data_length or recold.data_scale != recnew.data_scale or recold.char_length != recnew.char_length or recold.data_type != recnew.data_type or recold.data_type_owner != recnew.data_type_owner: # Has only the size changed? if rec.data_type == recold.data_type == recnew.data_type and rec.data_type_owner == recold.data_type_owner == recnew.data_type_owner: try: data_precision = max(r.data_precision for r in (rec, recold, recnew) if r.data_precision is not None) except ValueError: data_precision = None try: data_scale = max(r.data_scale for r in (rec, recold, recnew) if r.data_scale is not None) except ValueError: data_scale = None try: char_length = max(r.char_length for r in (rec, recold, recnew) if r.char_length is not None) except ValueError: char_length = None columntype = _columntype(rec, data_precision=data_precision, data_scale=data_scale, char_length=char_length) code.append(f" {columntype}") else: # The type has changed too if recnew.data_type != rec.data_type or recnew.data_type_owner != rec.data_type_owner: raise ConflictError(self, "data_type unmergeable") elif recnew.data_precision != rec.data_precision: raise ConflictError(self, "data_precision unmergeable") elif recnew.data_scale != rec.data_scale: raise ConflictError(self, "data_scale unmergeable") elif recnew.char_length != rec.char_length: raise ConflictError(self, "char_length unmergeable") code.append(f" {_columntype(recnew)}") # Has the default changed? default = _columndefault(rec) olddefault = _columndefault(recold) newdefault = _columndefault(recnew) if olddefault != newdefault: if newdefault != default: raise ConflictError(self, "default value unmergable") code.append(f" default {newdefault}") # Check nullability if recold.nullable != recnew.nullable: if recnew.nullable == "N": code.append(" not null") else: code.append(" null") if term: code.append(";\n") else: code.append("\n") return "".join(code) def dropsql(self, connection=None, term=True): (connection, cursor) = self.getcursor(connection) name = self.name.split(".") code = f"alter table {getfullname(name[0], self.owner)} drop column {getfullname(name[1], None)}" if term: code += ";\n" else: code += "\n" return code def table(self): name = self.name.split(".") return Table(name[0], self.owner, self.connection) def cdate(self, connection=None): # The column creation date is the table creation date return self.table().cdate(connection) def udate(self, connection=None): # The column modification date is the table modification date return self.table().udate(connection) def references(self, connection=None): connection = self.getconnection(connection) name = self.name.split(".") yield Table(name[0], self.owner, connection)
[docs] def datatype(self, connection=None): """ The SQL type of this column. """ (connection, cursor) = self.getcursor(connection) rec = self._getcolumnrecord(cursor) return _columntype(rec)
[docs] def default(self, connection=None): """ The SQL default value for this column. """ (connection, cursor) = self.getcursor(connection) rec = self._getcolumnrecord(cursor) return _columndefault(rec)
[docs] def nullable(self, connection=None): """ Is this column nullable? """ (connection, cursor) = self.getcursor(connection) rec = self._getcolumnrecord(cursor) return rec.nullable == "Y"
[docs] def compression(self, connection=None): """ The compression mode for this LOB column. Return ``None`` if this is not a LOB column, or it isn't compressed. """ (connection, cursor) = self.getcursor(connection) rec = self._getcolumnrecord(cursor) compression = rec.compression if compression == "NO": compression = None return compression
[docs] def comment(self, connection=None): """ The comment for this column. """ name = self.name.split(".") (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f""" select comments from {ddprefix}_col_comments where owner = nvl(:owner, user) and table_name = :table_name and column_name = :column_name """ cursor.execute(query, owner=self.owner, table_name=name[0], column_name=name[1]) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) return rec.comments or None
def synonyms(self, connection=None): yield from () def privileges(self, connection=None): yield from ()
[docs]class User(SchemaObject): """ Models a user in the database. """ def exists(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f"select 1 from {ddprefix}_users where username = :username" cursor.execute(query, username=self.name) rec = cursor.fetchone() return rec is not None
[docs] @classmethod def names(cls, connection): """ Generator that yields the names of all users in ascending order """ cursor = connection.cursor() ddprefix = cursor.ddprefix() query = f"select username from {ddprefix}_users order by username" cursor.execute(query) for row in cursor: yield row.username
[docs] @classmethod def objects(cls, connection): """ Generator that yields all user objects. """ for name in cls.names(connection): yield cls(name, connection)
[docs]class Preference(OwnedSchemaObject): """ Models a preference in the database. """ type = "preference" def exists(self, connection=None): (connection, cursor) = self.getcursor(connection) query = "select 1 from ctx_preferences where pre_owner = nvl(:owner, user) and pre_name = :name" cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() return rec is not None def createsql(self, connection=None, term=True): (connection, cursor) = self.getcursor(connection) query = "select pre_object from ctx_preferences where pre_owner = nvl(:owner, user) and pre_name = :name" cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) name = self.getfullname() code = ["begin\n"] code.append(f"\tctx_ddl.create_preference({sqlliteral(name)}, {sqlliteral(rec.pre_object)});\n") cursor.execute("select prv_attribute, prv_value from ctx_preference_values where prv_owner=nvl(:owner, user) and prv_preference=:name", owner=self.owner, name=self.name) for rec in cursor: code.append(f"\tctx_ddl.set_attribute({sqlliteral(name)}, {sqlliteral(rec.prv_attribute)}, {sqlliteral(rec.prv_value)});\n") code.append("end;\n") code = "".join(code) if term: code += "/\n" return code def dropsql(self, connection=None, term=True): name = self.getfullname() code = f"begin\n\tctx_ddl.drop_preference({sqlliteral(name)});\nend;\n" if term: code += "/\n" return code # FIXME: Implement :meth:`referencedby` by parsing the parameters of all domain indexes # and output those indexes here that reference us in any of their parameters
[docs] @classmethod def names(cls, connection, owner=None): """ Generator that yields the names of all preferences. """ cursor = connection.cursor() try: if owner is None: query = "select null as owner, pre_name from ctx_preferences where pre_owner=user order by pre_name" cursor.execute(query) elif owner is ALL: query = "select pre_owner as owner, pre_name from ctx_preferences order by pre_owner, pre_name" cursor.execute(query) elif isinstance(owner, str): query = "select decode(pre_owner, user, null, pre_owner) as owner, pre_name from ctx_preferences where pre_owner = :owner order by pre_name" cursor.execute(query, owner=owner) else: query = f"select pre_owner as owner, pre_name from ctx_preferences where pre_owner in ({', '.join(sqlstr(o) for o in owner)}) order by pre_owner, pre_name" cursor.execute(query) except DatabaseError as exc: if exc.args[0].code == 942: # ORA-00942: table or view does not exist return iter(()) else: raise else: for row in cursor: yield (row.pre_name, row.owner)
[docs] @classmethod def objects(cls, connection, owner=None): """ Generator that yields all preferences. """ for name in cls.names(connection, owner=owner): yield cls(name[0], name[1], connection)
[docs]class JobClass(SchemaObject): """ Models a job class (from the ``dbms_scheduler`` package) in the database. """ type = "jobclass" def exists(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f"select 1 from {ddprefix}_scheduler_job_classes where job_class_name = :name" cursor.execute(query, name=self.name) rec = cursor.fetchone() return rec is not None def createsql(self, connection=None, term=True): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f"select * from {ddprefix}_scheduler_job_classes where job_class_name = :name" cursor.execute(query, name=self.name) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) if rec.logging_level is None: logging_level = "null" else: logging_level = rec.logging_level.replace(" ", "_").lower() logging_level = f"dbms_scheduler.logging_{logging_level}" code = [] code.append(f"declare\n") code.append(f"\tv_jobname varchar2(128) := {sqlliteral(rec.job_class_name)};\n") code.append(f"begin\n") code.append(f"\tsys.dbms_scheduler.create_job_class(\n") code.append(f"\t\tjob_class_name=>v_jobname,\n") code.append(f"\t\tresource_consumer_group=>{sqlliteral(rec.resource_consumer_group)},\n") code.append(f"\t\tservice=>{sqlliteral(rec.service)},\n") code.append(f"\t\tlogging_level=>{logging_level},\n") code.append(f"\t\tlog_history=>{sqlliteral(rec.log_history)},\n") code.append(f"\t\tcomments=>''\n") code.append(f"\t);\n") code.append(f"end;\n") if term: code.append("/\n") code = "".join(code) return code def dropsql(self, connection=None, term=True): code = f"dbms_scheduler.drop_job_class(job_class_name=>{sqlliteral(self.name)})" if term: code += ";\n" else: code += "\n" return code def fixname(self, code): if code: code = code.splitlines(True) name = self.getfullname() code[1] = f"\tv_jobname varchar2(128) := {sqlliteral(name)};\n" code = "".join(code) return code
[docs] @classmethod def names(cls, connection): """ Generator that yields the names of all job classes. """ cursor = connection.cursor() ddprefix = cursor.ddprefix() query = f"select job_class_name from {ddprefix}_scheduler_job_classes order by job_class_name" cursor.execute(query) for row in cursor: yield row.job_class_name
[docs] @classmethod def objects(cls, connection): """ Generator that yields all job classes. """ for name in cls.names(connection): yield cls(name, connection)
def references(self, connection=None): yield from () def referencedby(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f"select decode(owner, user, null, owner) as owner, job_name from {ddprefix}_scheduler_jobs where job_class = :name" cursor.execute(query, name=self.name) for rec in cursor: yield Job(rec.job_name, rec.owner, connection)
[docs]class Job(OwnedSchemaObject): """ Models a job (from the ``dbms_scheduler`` package) in the database. """ type = "job" def exists(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f"select 1 from {ddprefix}_scheduler_jobs where owner = nvl(:owner, user) and job_name = :name" cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() return rec is not None def createsql(self, connection=None, term=True): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f"select * from {ddprefix}_scheduler_jobs where owner = nvl(:owner, user) and job_name = :name" cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) enabled = "true" if rec.enabled == "TRUE" and rec.number_of_arguments == 0 else "false" auto_drop = "true" if rec.auto_drop == "TRUE" else "false" code = [] code.append(f"declare\n") code.append(f"\tv_jobname varchar2(128) := {sqlliteral(rec.job_name)};\n") code.append(f"begin\n") code.append(f"\tsys.dbms_scheduler.create_job(\n") code.append(f"\t\tjob_name=>v_jobname,\n") code.append(f"\t\tjob_type=>{sqlliteral(rec.job_type)},\n") code.append(f"\t\tjob_action=>{sqlliteral(rec.job_action)},\n") code.append(f"\t\tnumber_of_arguments=>{sqlliteral(rec.number_of_arguments)},\n") code.append(f"\t\tstart_date=>{sqlliteral(rec.start_date)},\n") code.append(f"\t\trepeat_interval=>{sqlliteral(rec.repeat_interval)},\n") code.append(f"\t\tend_date=>{sqlliteral(rec.end_date)},\n") code.append(f"\t\tjob_class=>{sqlliteral(rec.job_class)},\n") code.append(f"\t\tenabled=>{enabled},\n") code.append(f"\t\tauto_drop=>{auto_drop},\n") code.append(f"\t\tcomments=>{sqlliteral(rec.comments)}\n") code.append(f"\t);\n") query = f"select argument_position, argument_type, value from {ddprefix}_scheduler_job_args where owner = nvl(:owner, user) and job_name = :job_name order by argument_position" cursor.execute(query, owner=self.owner, job_name=rec.job_name) for rec2 in cursor: code.append(f"\n") code.append(f"\tsys.dbms_scheduler.set_job_argument_value(\n") code.append(f"\t\tjob_name=>v_jobname,\n") code.append(f"\t\targument_position=>{sqlliteral(rec2.argument_position)},\n") code.append(f"\t\targument_value=>{sqlliteral(rec2.value)}\n") code.append(f"\t);\n") if rec.enabled == "TRUE" and rec.number_of_arguments and rec.number_of_arguments > 0: code.append(f"\n") code.append(f"\tsys.dbms_scheduler.enable(v_jobname);\n") code.append("end;\n") if term: code.append("/\n") code = "".join(code) return code def dropsql(self, connection=None, term=True): name = self.getfullname() code = f"begin\n\tdbms_scheduler.drop_job(job_name=>{sqlliteral(name)});\nend;\n" if term: code += "/\n" return code def fixname(self, code): if code: code = code.splitlines(True) name = self.getfullname() code[1] = f"\tv_jobname varchar2(128) := {sqlliteral(name)};\n" code = "".join(code) return code
[docs] @classmethod def names(cls, connection, owner=None): """ Generator that yields the names of all jobs. """ cursor = connection.cursor() if owner is None: query = "select null as owner, job_name from user_scheduler_jobs order by job_name" cursor.execute(query) elif owner is ALL: ddprefix = cursor.ddprefix() query = f"select owner, job_name from {ddprefix}_scheduler_jobs order by owner, job_name" cursor.execute(query) elif isinstance(owner, str): ddprefix = cursor.ddprefix() query = f"select owner, job_name from {ddprefix}_scheduler_jobs where owner = :owner order by job_name" cursor.execute(query, owner=owner) else: ddprefix = cursor.ddprefix() query = f"select owner, job_name from {ddprefix}_scheduler_jobs where owner in ({', '.join(sqlstr(o) for o in owner)}) order by owner, job_name" cursor.execute(query) for row in cursor: yield (row.job_name, row.owner)
[docs] @classmethod def objects(cls, connection, owner=None): """ Generator that yields all jobs. """ for name in cls.names(connection, owner=owner): yield cls(name[0], name[1], connection)
def references(self, connection=None): (connection, cursor) = self.getcursor(connection) ddprefix = cursor.ddprefix() query = f"select job_class from {ddprefix}_scheduler_jobs where owner = nvl(:owner, user) and job_name = :name" cursor.execute(query, owner=self.owner, name=self.name) rec = cursor.fetchone() if rec is None: raise SQLObjectNotFoundError(self) # We don't want to reference any system job classes system_classes = {"DEFAULT_JOB_CLASS", "DEFAULT_IN_MEMORY_JOB_CLASS", "XMLDB_NFS_JOBCLASS"} if "$" not in rec.job_class and rec.job_class not in system_classes: yield JobClass(rec.job_class, connection) def referencedby(self, connection=None): # Oracle does not provide any information about which procedures etc. # the job calls, so skip the check # (To still be able to get objects in a usable order # :meth:`Connect.objects` outputs jobs last.) yield from ()
### ### Classes that add an ``oracle`` scheme to the urls supported by :mod:`ll.url`. ### class OracleURLConnection(url_.Connection): def __init__(self, context, connection, mode): self.dbconnection = connect(connection, mode=mode) if mode is not None else connect(connection) def open(self, url, mode="rb", encoding="utf-8", errors="strict"): return OracleFileResource(self, url, mode, encoding, errors) def close(self): self.dbconnection.close() def _type(self, url): path = url.path if path and not path[-1]: path = path[:-1] lp = len(path) if lp == 0: return "root" elif lp == 1: if path[0] == "user": return "allusers" else: return "type" elif lp == 2: if path[0] == "user": return "user" else: return "object" elif lp == 3: if path[0] == "user": return "usertype" elif lp == 4: if path[0] == "user": return "userobject" raise FileNotFoundError(errno.ENOENT, f"no such file or directory: {url!r}") from None def _infofromurl(self, url): type = self._type(url) if type == "root": owner = None objectype = None name = None elif type == "allusers": owner = None objectype = None name = None elif type == "type": owner = None objectype = None name = None elif type == "user": owner = url.path[1] objectype = None name = None elif type == "object": owner = None objectype = url.path[0] name = url.path[1] elif type == "usertype": owner = url.path[1] objectype = url.path[2] name = None else: owner = url.path[1] objectype = url.path[2] name = url.path[3] if name is not None: if name.lower().endswith(".sql"): name = name[:-4] name = unicodedata.normalize('NFC', name) return (type, owner, objectype, name) def _objectfromurl(self, url): (type, owner, objecttype, name) = self._infofromurl(url) if objecttype not in SchemaObject.name2type: raise ValueError(f"don't know how to handle {url!r}") return SchemaObject.name2type[objecttype](name, owner) def isdir(self, url): return not self._type(url).endswith("object") def isfile(self, url): return self._type(url).endswith("object") def mimetype(self, url): if self.isdir(url): return "application/octet-stream" type = url.path[0 if url.path[0] != "user" else 2] return f"text/x-oracle-{type}" def owner(self, url): if len(url.path) >= 2 and url.path[0] == "user" and url.path[1]: return url.path[1] else: c = self.dbconnection.cursor() c.execute("select user from dual") return c.fetchone()[0] def exists(self, url): try: type = self._type(url) except FileNotFoundError: return False if type.endswith("object"): return self._objectfromurl(url).exists(self.dbconnection) else: return True def cdate(self, url): if self.isdir(url): return bigbang try: obj = self._objectfromurl(url) except SQLNoSuchObjectError: raise FileNotFoundError(errno.ENOENT, f"no such file: {url!r}") return obj.cdate(self.dbconnection) def mdate(self, url): if self.isdir(url): return bigbang try: obj = self._objectfromurl(url) except SQLNoSuchObjectError: raise FileNotFoundError(errno.ENOENT, f"no such file: {url!r}") return obj.udate(self.dbconnection) def _walk(self, cursor, url): def _event(url, event): cursor.url = url cursor.event = event cursor.isdir = event != "file" cursor.isfile = not cursor.isdir return cursor def _dir(childname): emitbeforedir = cursor.beforedir emitafterdir = cursor.afterdir enterdir = cursor.enterdir if emitbeforedir or enterdir or emitafterdir: childurl = url / childname if emitbeforedir: yield _event(childurl, "beforedir") emitbeforedir = cursor.beforedir emitafterdir = cursor.afterdir enterdir = cursor.enterdir cursor.restore() if enterdir: yield from self._walk(cursor, childurl) if emitafterdir: yield _event(childurl, "afterdir") cursor.restore() absurl = cursor.rooturl / url type = self._type(absurl) if type == "root": # directory of types for the current user for childname in sorted(SchemaObject.name2type): if childname not in ("tablecomment", "columncomment", "column"): yield from _dir(f"{childname}/") elif type == "type": # directory of objects of the specified type for current user path = absurl.path type = path[0] try: class_ = SchemaObject.name2type[type] except KeyError: raise FileNotFoundError(errno.ENOENT, f"no such file or directory: {url!r}") from None if issubclass(class_, OwnedSchemaObject): names = (name for (name, owner) in class_.names(self.dbconnection, None)) else: names = class_.names(self.dbconnection) for name in names: if cursor.file: yield _event(url / f"{makeurl(name)}.sql", "file") cursor.restore() elif type == "allusers": # directory of all users path = url.path for name in User.names(self.dbconnection): yield from _dir(f"{makeurl(name)}/") elif type == "user": # directory of types for a specific user path = absurl.path for childname in sorted(SchemaObject.name2type): if childname not in ("tablecomment", "columncomment", "column"): yield from _dir(f"{childname}/") elif type == "usertype": # directory of objects of the specified type for a specific user path = absurl.path type = path[2] try: class_ = SchemaObject.name2type[type] except KeyError: raise FileNotFoundError(errno.ENOENT, f"no such file or directory: {url!r}") from None if issubclass(class_, OwnedSchemaObject): names = (name for (name, owner) in class_.names(self.dbconnection, path[1])) else: names = class_.names(self.dbconnection) for name in names: if cursor.file: yield _event(url / f"{makeurl(name)}.sql", "file") cursor.restore() else: raise NotADirectoryError(errno.ENOTDIR, f"Not a directory: {url}") def walk(self, url, beforedir=True, afterdir=False, file=True, enterdir=True): cursor = url_.Cursor(url, beforedir=beforedir, afterdir=afterdir, file=file, enterdir=enterdir) return self._walk(cursor, url_.URL()) def __repr__(self): return f"<{self.__class__.__module__}.{self.__class__.__qualname__} to {self.connection.connectstring()!r} at {id(self):#x}>"
[docs]class OracleFileResource(url_.Resource): """ An :class:`OracleFileResource` wraps an Oracle database object (like a table, view, function, procedure etc.) in a file-like API for use with :mod:`ll.url`. """ def __init__(self, connection, url, mode="r", encoding="utf-8", errors="strict"): self.connection = connection self.url = url self.mode = mode self.encoding = encoding self.errors = errors self.closed = False self.name = str(self.url) if "w" in self.mode: if "b" in self.mode: self.stream = io.BytesIO() else: self.stream = io.StringIO() else: code = self.connection._objectfromurl(url).createsql(self.connection.dbconnection, term=False) if "b" in self.mode: code = code.encode(self.encoding, self.errors) self.stream = io.BytesIO(code) else: self.stream = io.StringIO(code) def read(self, size=-1): if self.closed: raise ValueError("I/O operation on closed file") return self.stream.read(size) def write(self, data): if self.closed: raise ValueError("I/O operation on closed file") return self.stream.write(data) def mimetype(self): return self.connection.mimetype(self.url) def cdate(self): return self.connection.cdate(self.url) def mdate(self): return self.connection.mdate(self.url) def __iter__(self): data = self.read() return iter(data.splitlines(True)) def close(self): if not self.closed: if "w" in self.mode: obj = self.connection._objectfromurl(self.url) code = self.stream.getvalue() if isinstance(code, bytes): code = code.decode(self.encoding, self.errors) code = obj.fixname(code) cursor = self.connection.dbconnection.cursor() cursor.execute(code) self.stream = None self.closed = True
class OracleSchemeDefinition(url_.SchemeDefinition): def _connect(self, url, context=None, **kwargs): context = url_.getcontext(context) # Use one :class:`OracleURLConnection` for each ``user@host`` combination server = url.server try: connections = context.schemes["oracle"] except KeyError: connections = context.schemes["oracle"] = {} try: connection = connections[server] except KeyError: userinfo = url.userinfo.split(":") lui = len(userinfo) if lui == 2: mode = None elif lui == 3: try: mode = dict(sysoper=SYSOPER, sysdba=SYSDBA, normal=None)[userinfo[2]] except KeyError: raise ValueError(f"unknown connect mode {userinfo[2]!r}") else: raise ValueError(f"illegal userinfo {url.userinfo!r}") connection = connections[server] = OracleURLConnection(context, f"{userinfo[0]}/{userinfo[1]}@{url.host}", mode) return (connection, kwargs) def open(self, url, mode="rb", context=None): (connection, kwargs) = self._connect(url, context) return OracleFileResource(connection, url, mode, **kwargs) def closeall(self, context): for connection in context.schemes["oracle"].values(): connection.close() url_.schemereg["oracle"] = OracleSchemeDefinition("oracle", usehierarchy=True, useserver=True, usefrag=False, islocal=False, isremote=True)