# -*- coding: utf-8 -*-
# cython: language_level=3, always_allow_keywords=True
## Copyright 2004-2025 by LivingLogic AG, Bayreuth/Germany
## Copyright 2004-2025 by Walter Dörwald
##
## All Rights Reserved
##
## See ll/xist/__init__.py for the license
"""
:mod:`!ll.orasql` contains utilities for working with oracledb__:
* It allows calling procedures and functions with keyword arguments (via the
classes :class:`Procedure` and :class:`Function`).
* Query results will be put into :class:`Record` objects, where database
fields are accessible as object attributes.
* The :class:`Connection` class provides methods for iterating through the
database metadata.
* Importing this module adds support for URLs with the scheme ``oracle`` to
:mod:`ll.url`. Examples of these URLs are::
oracle://user:pwd@db/
oracle://user:pwd@db/view/
oracle://user:pwd@db/view/USER_TABLES.sql
oracle://sys:pwd:sysdba@db/
__ https://oracle.github.io/python-oracledb/
"""
import sys, os, urllib.request, urllib.parse, urllib.error, datetime, itertools
import io, errno, re, unicodedata, decimal, inspect, collections
from collections import abc
from oracledb import *
from oracledb import __version__ as __oracledb_version__
from ll import misc, url as url_
__docformat__ = "reStructuredText"
bigbang = datetime.datetime(1970, 1, 1, 0, 0, 0) # timestamp for Oracle "directories"
ALL = misc.Const("ALL", "ll.orasql") # marker object for specifying a user
###
### Exceptions
###
class SQLObjectNotFoundError(IOError):
def __init__(self, obj):
IOError.__init__(self, errno.ENOENT, f"no such {obj.type}: {obj.getfullname()}")
self.obj = obj
class SQLNoSuchObjectError(Exception):
def __init__(self, name, owner):
self.name = name
self.owner = owner
def __repr__(self):
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} name={self.name!r} owner={self.owner!r} at {id(self):#x}>"
def __str__(self):
if self.owner is None:
return f"no object named {self.name!r}"
else:
return f"no object named {self.name!r} for owner {self.owner!r}"
class UnknownModeError(ValueError):
def __init__(self, mode):
self.mode = mode
def __repr__(self):
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} mode={self.mode!r} at {id(self):#x}>"
def __str__(self):
return f"unknown mode {self.mode!r}"
class ConflictError(ValueError):
def __init__(self, object, message):
self.object = object
self.message = message
def __repr__(self):
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} object={self.object!r} message={self.message!r} at {id(self):#x}>"
def __str__(self):
return f"conflict in {self.object!r}: {self.message}"
###
### Helper classes and functions
###
def makeurl(name):
return urllib.request.pathname2url(name.encode("utf-8")).replace("/", "%2f")
def sqlliteral(value):
if value is None:
return "null"
elif isinstance(value, int):
return str(value)
elif isinstance(value, datetime.datetime):
return f"to_date('{value:%Y-%m-%d %H:%M:%S}', 'YYYY-MM-DD HH24:MI:SS')"
elif isinstance(value, str):
value = value.replace("'", "''")
return f"'{value}'"
else:
raise TypeError(f"unknown type {type(value)!r}")
[docs]
class Args(dict):
"""
An :class:`Args` object is a subclass of :class:`dict` that is used for
passing arguments to procedures and functions. Both item and attribute access
(i.e. :meth:`__getitem__` and :meth:`__getattr__`) are available. Names are
case insensitive.
"""
def __init__(self, arg=None, **kwargs):
dict.__init__(self)
self.update(arg, **kwargs)
def update(self, arg=None, **kwargs):
if arg is not None:
# if arg is a mapping use iteritems
dict.update(self, ((key.lower(), value) for (key, value) in getattr(arg, "iteritems", arg)))
dict.update(self, ((key.lower(), value) for (key, value) in kwargs.items()))
def __getitem__(self, name):
return dict.__getitem__(self, name.lower())
def __setitem__(self, name, value):
dict.__setitem__(self, name.lower(), value)
def __delitem__(self, name):
dict.__delitem__(self, name.lower())
def __getattr__(self, name):
try:
return self.__getitem__(name)
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, value):
self.__setitem__(name, value)
def __delattr__(self, name):
try:
self.__delitem__(name)
except KeyError:
raise AttributeError(name)
def __repr__(self):
items = ", ".join(f"{key}={value!r}" for (key, value) in self.items())
return f"{self.__class__.__module__}.{self.__class__.__qualname__}({items})"
[docs]
class LOBStream:
"""
A :class:`LOBStream` object provides streamlike access to a ``BLOB`` or ``CLOB``.
"""
def __init__(self, value):
self.value = value
self.pos = 0
[docs]
def readall(self):
"""
Read all remaining data from the stream and return it.
"""
result = self.value.read(self.pos+1)
self.pos = self.value.size()
return result
[docs]
def readchunk(self):
"""
Read a chunk of data from the stream and return it. Reading is done in
optimally sized chunks.
"""
size = self.value.getchunksize()
bytes = self.value.read(self.pos+1, size)
self.pos += size
if self.pos >= self.value.size():
self.pos = self.value.size()
return bytes
[docs]
def read(self, size=None):
"""
Read ``size`` bytes/characters from the stream and return them.
If ``size`` is :const:`None` all remaining data will be read.
"""
if size is None:
return self.readall()
if size <= 0:
return self.readchunk()
data = self.value.read(self.pos+1, size)
self.pos += size
if self.pos >= self.value.size():
self.pos = self.value.size()
return data
[docs]
def reset(self):
"""
Reset the stream so that the next :meth:`read` call starts at the
beginning of the LOB.
"""
self.pos = 0
[docs]
def seek(self, offset, whence=0):
"""
Seek to the position ``offset`` in the LOB. The ``whence`` argument
is optional and defaults to ``0`` (absolute file positioning);
The other allowed value is ``1`` (seek relative to the current position).
"""
if whence == 0:
self.pos = whence
elif whence == 1:
self.pos += whence
else:
raise ValueError(f"unkown whence: {whence!r}")
size = self.value.size()
if self.pos >= size:
self.pos = size
elif self.pos < 0:
self.pos = 0
def _decodelob(value, readlobs):
if value is not None:
if readlobs is True or (isinstance(readlobs, int) and value.size() <= readlobs):
value = value.read()
else:
value = LOBStream(value)
return value
class RecordMaker:
def __init__(self, cursor):
self._readlobs = cursor.readlobs
self._index2name = tuple(d[0].lower() for d in cursor.description)
self._name2index = dict(zip(self._index2name, itertools.count()))
self._index2conv = tuple(getattr(self, d[1].name, self.DEFAULT) for d in cursor.description)
def __call__(self, *row):
row = tuple(conv(value) for (conv, value) in zip(self._index2conv, row))
return Record(self._index2name, self._name2index, row)
def DB_TYPE_CLOB(self, value):
return _decodelob(value, self._readlobs)
def DB_TYPE_NCLOB(self, value):
return _decodelob(value, self._readlobs)
def DB_TYPE_BLOB(self, value):
return _decodelob(value, self._readlobs)
def DEFAULT(self, value):
return value
[docs]
class Record(tuple, abc.Mapping):
"""
A :class:`Record` is a subclass of :class:`tuple` that is used for storing
results of database fetches and procedure and function calls. Both item and
attribute access (i.e. :meth:`__getitem__` and :meth:`__getattr__`) are
available. Field names are case insensitive.
"""
def __new__(cls, index2name, name2index, values):
record = tuple.__new__(cls, values)
record._index2name = index2name
record._name2index = name2index
return record
def __getitem__(self, arg):
if isinstance(arg, str):
arg = self._name2index[arg.lower()]
return tuple.__getitem__(self, arg)
def __getattr__(self, name):
try:
index = self._name2index[name.lower()]
except KeyError:
raise AttributeError(f"{self.__class__.__module__}.{self.__class__.__qualname__} object has no attribute {name!r}")
return tuple.__getitem__(self, index)
def ul4_getattr(self, name):
return getattr(self, name)
def ul4_hasattr(self, name):
return name.lower() in self._name2index
[docs]
def get(self, name, default=None):
"""
Return the value for the field named ``name``. If this field doesn't
exist in ``self``, return ``default`` instead.
"""
try:
index = self._name2index[name.lower()]
except KeyError:
return default
return tuple.__getitem__(self, index)
def __contains__(self, name):
return name.lower() in self._name2index
[docs]
def keys(self):
"""
Return an iterator over field names.
"""
return iter(self._index2name)
[docs]
def items(self):
"""
Return an iterator over (field name, field value) tuples.
"""
for (index, key) in enumerate(self._index2name):
yield (key, tuple.__getitem__(self, index))
[docs]
def replace(self, **kwargs):
"""
Return a new :class:`Record` with the same fields as ``self``, except
for those fields given new values by whichever keyword arguments are
specified.
"""
values = list(self)
for (key, value) in kwargs.items():
values[self._name2index[key.lower()]] = value
return self.__class__(self._index2name, self._name2index, values)
def __repr__(self):
items = ", ".join(f"{key}={value!r}" for (key, value) in self.items())
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} {items} at {id(self):#x}>"
def _repr_pretty_(self, p, cycle):
prefix = f"<{self.__class__.__module__}.{self.__class__.__qualname__}"
suffix = f"at {id(self):#x}"
if cycle:
p.text(f"{prefix} ... {suffix}>")
else:
with p.group(4, prefix, ">"):
for (key, value) in self.items():
p.breakable()
p.text(f"{key}=")
p.pretty(value)
p.breakable()
p.text(suffix)
[docs]
class ConnectionPool(ConnectionPool):
"""
:class:`ConnectionPool` is a subclass of :class:`oracledb.ConnectionPool`.
"""
def __init__(self, dsn=None, **kwargs):
super().__init__(dsn, **{"connectiontype": Connection, **kwargs})
def connectstring(self):
return f"{self.username}@{self.dsn}"
def __repr__(self):
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} object db={self.connectstring()!r} at {id(self):#x}>"
def sqlstr(s):
if s is None:
return "null"
else:
s = s.replace("'", "''")
return f"'{s}'"
[docs]
def owned(obj, owner):
"""
Do we own the object ``obj`` according to the owner specification
``owner``?
``owner`` can be:
:const:`None`
The current user (i.e. via the view ``USER_OBJECTS``);
:const:`ALL`
Any user (via the views ``ALL_OBJECTS`` or ``DBA_OBJECTS``);
username : string
The specified user
:class:`set` or :class:`tuple`
A set or tuple of usernames. An object must belong to any of the users
to be considered owned.
"""
# We ignore objects that have no owner (like :class:`JobClass` objects)
# we only output them if ``owner is ALL``.
if owner is None:
if not isinstance(obj, OwnedSchemaObject) or obj.owner is not None:
return False
elif owner is ALL:
return True
elif isinstance(owner, (set, tuple)):
if not isinstance(obj, OwnedSchemaObject) or not any(owned(obj, o) for o in owner):
return False
else:
if not isinstance(obj, OwnedSchemaObject) or obj.owner != owner:
return False
return True
[docs]
class Connection(Connection):
"""
:class:`Connection` is a subclass of :class:`oracledb.Connection`.
"""
[docs]
def __init__(self, *args, **kwargs):
r"""
Create a new connection. In addition to the parameters supported by
:func:`oracledb.connect` the following keyword argument is supported.
``readlobs`` : bool or integer
If ``readlobs`` is :const:`False` all cursor fetches return
:class:`LOBStream` objects for LOB objects. If ``readlobs`` is an
:class:`int` LOBs with a maximum size of ``readlobs`` will be
returned as :class:`bytes`/:class:`str` objects. If ``readlobs``
is :const:`True` all LOB values will be returned as
:class:`bytes`/:class:`str` objects.
``decimal`` : bool
If ``decimal`` is :const:`True` numbers will be returned as
:class:`decimal.Decimal` objects, else :class:`float` will be used.
Furthermore ``module`` will automatically be set to the currently running
script on the next database round-trip. To skip that, pass ``module``
yourself. Use ``None`` to get :mod:`oracledb`\s default value.
"""
if "readlobs" in kwargs:
kwargs = kwargs.copy()
self.readlobs = kwargs.pop("readlobs", False)
else:
self.readlobs = False
self.decimal = kwargs.pop("decimal", False)
super().__init__(*args, **kwargs)
if "module" not in kwargs:
module = misc.sysinfo.short_script_name[-64:]
if module == "<shell>":
module = f"<shell pid={os.getpid()}>"
self.module = module
elif kwargs["module"] is not None:
self.module = kwargs["module"]
if self.decimal:
self.outputtypehandler = self._numbersasdecimal
self.mode = kwargs.get("mode")
self._ddprefix = None # Do we have access to the ``DBA_*`` views?
self._ddprefixargs = None # Do we have access to the ``DBA_ARGUMENTS`` view (which doesn't exist in Oracle 10)?
def _numbersasdecimal(self, cursor, name, defaultType, length, precision, scale):
if defaultType is DB_TYPE_NUMBER and scale:
return cursor.var(decimal.Decimal, arraysize=cursor.arraysize)
def connectstring(self):
return f"{self.username}@{self.dsn}"
[docs]
def cursor(self, readlobs=None):
"""
Return a new cursor for this connection. For the meaning of
``readlobs`` see :meth:`__init__`.
"""
return Cursor(self, readlobs=readlobs)
def __repr__(self):
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} db={self.connectstring()!r} at {id(self):#x}>"
[docs]
def tables(self, owner=None, mode="flat"):
"""
Generator that yields all table definitions in the current users schema
(or all users schemas). ``mode`` specifies the order in which tables
will be yielded:
``"create"``
Create order, inserting records into the table in this order will not
violate foreign key constraints.
``"drop"``
Drop order, deleting records from the table in this order will not
violate foreign key constraints.
``"flat"``
Unordered.
``owner`` specifies from which user tables should be yielded. It can be
:const:`None` (for the current user), :const:`ALL` (for all users
(the default)) or a user name.
Tables that are materialized views will be skipped in all cases.
"""
if mode not in ("create", "drop", "flat"):
raise UnknownModeError(mode)
cursor = self.cursor()
ddprefix = cursor.ddprefix()
tables = Table.objects(self, owner)
if mode == "flat":
yield from tables
else:
done = set()
tables = {(table.name, table.owner): table for table in tables}
def do(table):
if table not in done:
done.add(table)
query = f"""
select
ac1.table_name,
decode(ac1.owner, user, null, ac1.owner) as owner
from
{ddprefix}_constraints ac1,
{ddprefix}_constraints ac2
where
ac1.constraint_type = 'R' and
ac2.table_name = :name and
ac2.owner = nvl(:owner, user) and
ac1.r_constraint_name = ac2.constraint_name and
ac1.r_owner = ac2.owner
"""
cursor.execute(query, name=table.name, owner=table.owner)
for rec in cursor.fetchall():
try:
t2 = tables[(rec.table_name, rec.owner)]
except KeyError:
pass
else:
yield from do(t2)
yield table
for table in tables.values():
yield from do(table)
[docs]
def sequences(self, owner=None):
"""
Generator that yields sequences. ``owner`` can be :const:`None` (the
default), :const:`ALL` or a user name.
"""
return Sequence.objects(self, owner)
[docs]
def fks(self, owner=None):
"""
Generator that yields all foreign key constraints. ``owner`` can be
:const:`None` (the default), :const:`ALL` or a user name.
"""
return ForeignKey.objects(self, owner)
[docs]
def privileges(self, owner=None):
"""
Generator that yields object privileges. ``owner`` can be :const:`None`
(the default), :const:`ALL`, a user name or a set or tuple of user names.
"""
return Privilege.objects(self, owner)
[docs]
def synonyms(self, owner=None, object_owner=ALL):
"""
Generator that yields synonyms. ``owner`` and ``object_owner`` can be
:const:`None` (the default), :const:`ALL`, a user name or a set or tuple
of user names.
"""
return Privilege.objects(self, owner)
[docs]
def users(self):
"""
Generator that yields all users.
"""
return User.objects(self)
[docs]
def objects(self, owner=None, mode="create"):
"""
Generator that yields the sequences, tables, primary keys, foreign keys,
table and columns comments, unique constraints, indexes, views, functions,
procedures, packages, types and jobsin the current users schema
(or all users schemas) in a specified order.
``mode`` specifies the order in which objects will be yielded:
``"create"``
Create order, i.e. recreating the objects in this order will not lead
to errors;
``"drop"``
Drop order, i.e. dropping the objects in this order will not lead to
errors;
``"flat"``
Unordered.
``owner`` specifies from which schema objects should be yielded.
For more information see :func:`owned`.
"""
if mode not in {"create", "drop", "flat"}:
raise UnknownModeError(mode)
cursor = self.cursor()
# Prefetch all dependency info to speed up calls to each objects
# :meth:`references` or :meth:`referencedby`.
dependencies = Dependencies(cursor)
done = set()
def do(obj):
if mode == "create":
for subobj in obj.referencesall(self, done, dependencies):
if owned(subobj, owner):
yield subobj
elif mode == "drop":
for subobj in obj.referencedbyall(self, done, dependencies):
if owned(subobj, owner):
yield subobj
else:
if obj not in done:
done.add(obj)
if owned(obj, owner):
yield obj
def dosequences():
for sequence in Sequence.objects(self, owner):
yield from do(sequence)
def dotables():
for table in Table.objects(self, owner):
if mode == "create" or mode == "flat":
yield from do(table)
# Table comment
yield table.comment()
# Column comments
for comment in table.comments():
# No dependency checks necessary, but use ``do`` anyway (this does no queries)
yield from do(comment)
# Primary key
pk = table.pk()
if pk is not None:
yield from do(pk)
if mode == "drop":
yield from do(table)
def dorest():
# Since Oracle doesn't give as any dependency information about jobs,
# output them last (in "create" mode) to be on the safe side.
for type in (CheckConstraint, UniqueConstraint, ForeignKey, Preference, Index, Synonym, View, MaterializedView, Function, Procedure, Package, PackageBody, Type, TypeBody, Trigger, JavaSource, Job):
for obj in type.objects(self, owner):
yield from do(obj)
funcs = [dosequences, dotables, dorest]
if mode == "drop":
funcs = reversed(funcs)
for func in funcs:
yield from func()
def _getobject(self, name, owner=None):
cursor = self.cursor()
ddprefix = cursor.ddprefix()
query = f"""
select
object_name,
decode(owner, user, null, owner) as owner,
object_type
from
{ddprefix}_objects
where
object_name = :object_name and
owner = nvl(:owner, user)
"""
cursor.execute(query, object_name=name, owner=owner)
rec = cursor.fetchone()
if rec is not None:
type = rec.object_type.lower()
try:
cls = SchemaObject.name2type[type]
except KeyError:
raise TypeError(f"type {type} not supported")
else:
return cls(rec.object_name, rec.owner, self)
raise SQLNoSuchObjectError(name, owner)
[docs]
def objects_named(self, name, owner=None):
"""
Return all objects named ``name`` from the schema. If ``owner`` is
:const:`None` the current schema is queried, else the specified one is
used. ``name`` and ``owner`` are treated case insensitively.
There might be multiple object with the same name, if these names only
differ in casing. Also there *will* be multiple object with the same name
for packages and package bodies.
"""
if isinstance(name, str):
name = str(name)
if isinstance(owner, str):
owner = str(owner)
cursor = self.cursor()
ddprefix = cursor.ddprefix()
if "." in name:
name = name.split(".")
query = f"""
select
decode(owner, user, null, owner) as owner,
object_name || '.' || procedure_name as object_name,
decode(
(
select
count(*)
from
{ddprefix}_arguments
where
owner = nvl(:owner, user) and
lower(object_name) = lower(:object_name) and
lower(package_name) = lower(:package_name) and
argument_name is null
),
0,
'procedure',
'function'
) as object_type
from
{ddprefix}_procedures
where
lower(procedure_name) = lower(:object_name) and
lower(owner) = lower(nvl(:owner, user)) and
lower(object_name) = lower(:package_name)
"""
cursor.execute(query, object_name=name[1], package_name=name[0], owner=owner)
else:
query = f"""
select
object_name,
decode(owner, user, null, owner) as owner,
object_type
from
{ddprefix}_objects
where
lower(object_name) = lower(:object_name) and
lower(owner) = lower(nvl(:owner, user))
"""
cursor.execute(query, object_name=name, owner=owner)
for rec in cursor:
type = rec.object_type.lower()
try:
cls = SchemaObject.name2type[type]
except KeyError:
raise TypeError(f"type {type} not supported")
else:
yield cls(rec.object_name, rec.owner, self)
[docs]
def object_named(self, name, owner=None):
"""
Return the object named ``name`` from the schema. If ``owner`` is
:const:`None` the current schema is queried, else the specified one is
used. ``name`` and ``owner`` are treated case insensitively.
If there are multiple objects with the same name, which one gets returned
is undefined.
If there is no such object an :exc:`SQLNoSuchObjectError` exception will
be raised.
"""
result = misc.first(self.objects_named(name, owner))
if result is None:
raise SQLNoSuchObjectError(name, owner)
return result
# For backwards compatibility
def getobject(self, name, owner=None):
return self.object_named(name, owner=None)
connect_oracledb = connect
[docs]
def connect(*args, **kwargs):
"""
Create a connection to the database and return a :class:`Connection` object.
"""
return Connection(*args, **kwargs)
create_pool_oracledb = create_pool
[docs]
def create_pool(*args, **kwargs):
"""
Create a connection pool and return a :class:`ConnectionPool` object.
"""
return create_pool_oracledb(*args, **{"pool_class": ConnectionPool, **kwargs})
[docs]
class Cursor(Cursor):
"""
A subclass of the cursor class in :mod:`oracledb`. The "fetch" methods
will return records as :class:`Record` objects and ``LOB`` values will be
returned as :class:`LOBStream` objects or :class:`str`/:class:`bytes` objects
(depending on the cursors :attr:`readlobs` attribute).
"""
[docs]
def __init__(self, connection, readlobs=None):
"""
Return a new cursor for the connection ``connection``. For the meaning
of ``readlobs`` see :meth:`Connection.__init__`.
"""
super().__init__(connection)
self.readlobs = (readlobs if readlobs is not None else connection.readlobs)
[docs]
def ddprefix(self):
"""
Return whether the user has access to the ``DBA_*`` views (``"dba"``) or
not (``"all"``).
"""
if self.connection._ddprefix is None:
try:
self.execute("select /*+FIRST_ROWS(1)*/ table_name from dba_tables")
except DatabaseError as exc:
if exc.args[0].code == 942: # ORA-00942: table or view does not exist
self.connection._ddprefix = "all"
else:
raise
else:
self.connection._ddprefix = "dba"
return self.connection._ddprefix
[docs]
def ddprefixargs(self):
"""
Return whether the user has access to the ``DBA_ARGUMENTS`` view
(``"dba"``) or not (``"all"``).
"""
# This method is separate from :meth:`ddprefix`, because Oracle 10 doesn't
# have a ``DBA_ARGUMENTS`` view.
if self.connection._ddprefixargs is None:
try:
self.execute("select /*+FIRST_ROWS(1)*/ object_name from dba_arguments")
except DatabaseError as exc:
if exc.args[0].code == 942: # ORA-00942: table or view does not exist
self.connection._ddprefixargs = "all"
else:
raise
else:
self.connection._ddprefixargs = "dba"
return self.connection._ddprefixargs
def execute(self, statement, parameters=None, **kwargs):
if parameters is not None:
result = super().execute(statement, parameters, **kwargs)
else:
result = super().execute(statement, **kwargs)
if self.description is not None:
self.rowfactory = RecordMaker(self)
return result
def executemany(self, statement, parameters):
result = super().executemany(statement, parameters)
if self.description is not None:
self.rowfactory = RecordMaker(self)
return result
def __repr__(self):
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} statement={self.statement!r} at {id(self):#x}>"
###
### Classes used for database meta data
###
[docs]
class Dependencies:
"""
Collects dependencies of objects that are not in system schemas.
This is used to speed up :meth:`Connection.objects` with ``mode="create"``
or ``mode="drop"``.
"""
def __init__(self, cursor):
ddprefix = cursor.ddprefix()
cursor.execute(
f"""
select
type,
decode(owner, user, null, owner) as owner,
name,
referenced_type,
decode(referenced_owner, user, null, referenced_owner) as referenced_owner,
referenced_name
from
{ddprefix}_dependencies
where
type != 'NON-EXISTENT' and
owner not in (select username from {ddprefix}_users where oracle_maintained = 'Y') and
referenced_owner not in (select username from {ddprefix}_users where oracle_maintained = 'Y')
order by
owner,
name,
referenced_owner,
referenced_name
"""
)
self.dependencies = cursor.fetchall()
self.references = collections.defaultdict(list)
self.referencedby = collections.defaultdict(list)
for rec in self.dependencies:
self.references[(rec.type, rec.owner, rec.name)].append(rec)
self.referencedby[(rec.referenced_type, rec.referenced_owner, rec.referenced_name)].append(rec)
[docs]
class MixinNormalDates:
"""
Mixin class that provides methods for determining creation and modification
dates for objects.
"""
def cdate(self, connection=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
sys_extract_utc(from_tz(cast(created as timestamp), dbtimezone))
from
{ddprefix}_objects
where
lower(object_type) = :type and
object_name = :name and
owner = nvl(:owner, user)
"""
cursor.execute(query, type=self.__class__.type, name=self.name, owner=self.owner)
row = cursor.fetchone()
if row is None:
raise SQLObjectNotFoundError(self)
return row[0]
def udate(self, connection=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
sys_extract_utc(from_tz(cast(last_ddl_time as timestamp), dbtimezone))
from
{ddprefix}_objects
where
lower(object_type) = :type and
object_name = :name and
owner = nvl(:owner, user)
"""
cursor.execute(query, type=self.__class__.type, name=self.name, owner=self.owner)
row = cursor.fetchone()
if row is None:
raise SQLObjectNotFoundError(self)
return row[0]
[docs]
class MixinCodeSQL:
"""
Mixin class that provides methods returning the create and drop statements
for various objects.
"""
def exists(self, connection=None, term=True):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
1
from
{ddprefix}_source
where
type = :type and
owner = nvl(:owner, user) and
name = :name
"""
cursor.execute(query, type=self.__class__.type.upper(), owner=self.owner, name=self.name)
rec = cursor.fetchone()
return rec is not None
def createsql(self, connection=None, term=True):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
text
from
{ddprefix}_source
where
type = :type and
owner = nvl(:owner, user) and
name = :name
order by
line
"""
cursor.execute(query, type=self.__class__.type.upper(), owner=self.owner, name=self.name)
code = "\n".join((rec.text or "").rstrip() for rec in cursor) # sqlplus strips trailing spaces when executing SQL scripts, so we do that too
if not code:
return ""
code = " ".join(code.split(None, 1)) # compress "PROCEDURE FOO"
code = code.strip()
type = self.__class__.type
code = code[code.lower().find(type)+len(type):].strip() # drop "procedure" etc.
# drop our own name (for triggers this includes the schema name)
if code.startswith('"'):
code = code[code.find('"', 1)+1:]
else:
while code and (code[0].isalnum() or code[0] in "_$."):
code = code[1:]
while code.startswith(" "):
code = code[1:]
code = f"create or replace {type} {self.getfullname()}{code}\n"
if term:
code += "\n/\n"
else:
code += "\n"
return code
def dropsql(self, connection=None, term=True):
if self.owner is not None:
name = f"{self.owner}.{self.name}"
else:
name = self.name
code = f"drop {self.__class__.type} {name}"
if term:
code += ";\n"
else:
code += "\n"
return code
def fixname(self, code):
if code:
code = code.split(None, 5)
code = f"create or replace {code[3]} {self.getfullname()}\n{code[5]}"
return code
def getfullname(name, owner):
parts = []
if owner is not None:
if owner != owner.upper() or not all(c.isalnum() or c == "_" for c in owner):
part = f'"{owner}"'
parts.append(owner)
for part in name.split("."):
if part != part.upper() or not all(c.isalnum() or c == "_" for c in part):
part = f'"{part}"'
parts.append(part)
return ".".join(parts)
class _SchemaObject_meta(type):
def __new__(mcl, name, bases, dict):
typename = None
if "type" in dict and name not in {"SchemaObject", "OwnedSchemaObject"}:
typename = dict["type"]
cls = type.__new__(mcl, name, bases, dict)
if typename is not None:
SchemaObject.name2type[typename] = cls
return cls
[docs]
class SchemaObject(object, metaclass=_SchemaObject_meta):
"""
The base class for all Python classes modelling schema objects in the
database.
"""
name2type = {} # maps the Oracle type name to the Python class (populated by the metaclass)
def __init__(self, name, connection=None):
self.name = name
self.connection = connection
def __repr__(self):
if self.connection is not None:
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} name={self.name!r} connection={self.connectstring!r} at {id(self):#x}>"
else:
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} name={self.name!r} at {id(self):#x}>"
def __str__(self):
return f"{self.type} {self.name}"
def __eq__(self, other):
return self.__class__ is other.__class__ and self.name == other.name
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(self.__class__.__name__) ^ hash(self.name)
def getfullname(self):
return getfullname(self.name, None)
[docs]
@misc.notimplemented
def createsql(self, connection=None, term=True):
"""
Return SQL code to create this object.
"""
[docs]
@misc.notimplemented
def dropsql(self, connection=None, term=True):
"""
Return SQL code to drop this object
"""
[docs]
@misc.notimplemented
def fixname(self, code):
"""
Replace the name of the object in the SQL code ``code`` with
the name of ``self``.
"""
[docs]
@misc.notimplemented
def exists(self, connection=None):
"""
Return whether the object ``self`` really exists in the database
specified by ``connection``.
"""
[docs]
def cdate(self, connection=None):
"""
Return a :class:`datetime.datetime` object with the creation date of
``self`` in the database specified by ``connection`` (or
:const:`None` if that information is not available).
"""
return None
[docs]
def udate(self, connection=None):
"""
Return a :class:`datetime.datetime` object with the last modification
date of ``self`` in the database specified by ``connection``
(or :const:`None` if that information is not available).
"""
return None
[docs]
def references(self, connection=None, dependencies=None):
"""
Objects directly used by ``self``.
If ``connection`` is not :const:`None` it will be used as the database
connection from which to fetch data. If ``connection`` is :const:`None`
the connection from which ``self`` has been extracted will be used. If
there is not such connection, you'll get an exception.
``dependencies`` can be a :class:`Dependencies` object, which will speed
up this method call, since this object already contains all dependencies.
"""
yield from ()
[docs]
def referencesall(self, connection=None, done=None, dependencies=None):
"""
All objects used by ``self`` (recursively).
For the meaning of ``connection`` and ``dependencies`` see
:meth:`references`.
``done`` is used internally and shouldn't be passed.
"""
if done is None:
done = set()
if self not in done:
done.add(self)
for obj in self.references(connection, dependencies):
yield from obj.referencesall(connection, done, dependencies)
yield self
[docs]
def referencedby(self, connection=None, dependencies=None):
"""
Objects using ``self``.
For the meaning of ``connection`` and ``dependencies`` see
:meth:`references`.
"""
yield from ()
[docs]
def referencedbyall(self, connection=None, done=None, dependencies=None):
"""
All objects depending on ``self`` (recursively).
For the meaning of ``connection`` and ``dependencies`` see
:meth:`references`.
``done`` is used internally and shouldn't be passed.
"""
if done is None:
done = set()
if self not in done:
done.add(self)
for obj in self.referencedby(connection):
yield from obj.referencedbyall(connection, done)
yield self
def getconnection(self, connection):
if connection is None:
connection = self.connection
if connection is None:
raise TypeError("no connection available")
return connection
def getcursor(self, connection):
connection = self.getconnection(connection)
return (connection, connection.cursor())
def getconnectstring(self):
if self.connection:
return self.connection.connectstring()
return None
connectstring = property(getconnectstring)
[docs]
class OwnedSchemaObject(SchemaObject):
"""
The base class for all Python classes modelling schema objects in the
database.
"""
name2type = {} # maps the Oracle type name to the Python class (populated by the metaclass)
def __init__(self, name, owner=None, connection=None):
self.name = name
self.owner = owner
self.connection = connection
def __repr__(self):
if self.owner is not None:
if self.connection is not None:
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} name={self.name!r} owner={self.owner!r} connection={self.connectstring!r} at {id(self):#x}>"
else:
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} name={self.name!r} owner={self.owner!r} at {id(self):#x}>"
else:
return super().__repr__()
def __str__(self):
if self.owner is not None:
return f"{self.type} {self.name} @ {self.owner}"
else:
return super().__str__()
def __eq__(self, other):
return self.__class__ is other.__class__ and self.name == other.name and self.owner == other.owner
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(self.__class__.__name__) ^ hash(self.name) ^ hash(self.owner)
def getfullname(self):
return getfullname(self.name, self.owner)
def references(self, connection=None, dependencies=None):
(connection, cursor) = self.getcursor(connection)
if dependencies is not None:
key = (self.type.upper(), self.owner, self.name)
records = dependencies.references[key]
else:
ddprefix = cursor.ddprefix()
query = f"""
select
referenced_type,
decode(referenced_owner, user, null, referenced_owner) as referenced_owner,
referenced_name
from
{ddprefix}_dependencies
where
type=upper(:type) and
name=:name and
owner=nvl(:owner, user) and
type != 'NON-EXISTENT'
order by
referenced_owner,
referenced_name
"""
cursor.execute(query, type=self.type, name=self.name, owner=self.owner)
records = cursor.fetchall()
for rec in records:
try:
cls = SchemaObject.name2type[rec.referenced_type.lower()]
except KeyError:
pass # FIXME: Issue a warning?
else:
yield cls(rec.referenced_name, rec.referenced_owner, connection)
def referencedby(self, connection=None, dependencies=None):
(connection, cursor) = self.getcursor(connection)
if dependencies is not None:
key = (self.type.upper(), self.owner, self.name)
records = dependencies.referencedby[key]
else:
ddprefix = cursor.ddprefix()
query = f"""
select
type,
decode(owner, user, null, owner) as owner,
name
from
{ddprefix}_dependencies
where
referenced_type = :type and
referenced_name = :name and
referenced_owner = nvl(:owner, user) and
type != 'NON-EXISTENT'
order by
owner,
name
"""
cursor.execute(query, type=self.type.upper(), name=self.name, owner=self.owner)
records = cursor.fetchall()
for rec in records:
try:
type = SchemaObject.name2type[rec.type.lower()]
except KeyError:
pass # FIXME: Issue a warning?
else:
yield type(rec.name, rec.owner, connection)
[docs]
@classmethod
def names(cls, connection, owner=None):
"""
Generator that yields the names of all objects of this type. The argument
``owner`` specifies whose objects are yielded. For more information
see :func:`owned`.
Names will be in ascending order.
"""
cursor = connection.cursor()
ddprefix = cursor.ddprefix()
if owner is None:
query = """
select
null as owner,
object_name
from
user_objects
where
object_type = :type and
object_name not like 'BIN$%' and
object_name not like 'DR$%'
order by
object_name
"""
cursor.execute(query, type=cls.type.upper())
elif owner is ALL:
query = f"""
select
decode(owner, user, null, owner) as owner,
object_name
from
{ddprefix}_objects
where
object_type = :type and
object_name not like 'BIN$%' and
object_name not like 'DR$%'
order by
owner,
object_name
"""
cursor.execute(query, type=cls.type.upper())
elif isinstance(owner, str):
query = f"""
select
decode(owner, user, null, owner) as owner,
object_name
from
{ddprefix}_objects
where
object_type = :type and
object_name not like 'BIN$%' and
object_name not like 'DR$%' and
owner = :owner
order by
owner,
object_name
"""
cursor.execute(query, type=cls.type.upper(), owner=owner)
else:
query = f"""
select
decode(owner, user, null, owner) as owner,
object_name
from
{ddprefix}_objects
where
object_type = :type and
object_name not like 'BIN$%' and
object_name not like 'DR$%' and
owner in ({', '.join(sqlstr(o) for o in owner)})
order by
owner,
object_name
"""
cursor.execute(query, type=cls.type.upper())
for row in cursor:
yield (row.object_name, row.owner)
[docs]
@classmethod
def objects(cls, connection, owner=None):
"""
Generator that yields all objects of this type in the current users schema.
The argument ``owner`` specifies whose objects are yielded. For more
information see :func:`owned`.
"""
for name in cls.names(connection, owner):
yield cls(name[0], name[1], connection)
[docs]
def synonyms(self, connection=None):
"""
Generator that yields all synonyms for this object.
"""
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
decode(owner, user, null, owner) as owner,
synonym_name
from
{ddprefix}_synonyms
where
table_owner=nvl(:owner, user) and
table_name=:name
"""
cursor.execute(query, owner=self.owner, name=self.name)
for rec in cursor:
yield Synonym(rec.synonym_name, rec.owner, connection)
[docs]
def privileges(self, connection=None):
"""
Generator that yields all privileges on this object.
"""
(connection, cursor) = self.getcursor(connection)
if self.owner is None:
query = """
select
decode(owner, user, null, owner) as owner,
privilege,
table_name as object,
decode(grantor, user, null, grantor) as grantor,
grantee
from
user_tab_privs
where
owner = user and
table_name = :name
order by
grantor,
privilege,
grantee
"""
cursor.execute(query, name=self.name)
else:
ddprefix = cursor.ddprefix()
# The column names in ``ALL_TAB_PRIVS`` and ``DBA_TAB_PRIVS`` are different, so we have to use two different queries
if ddprefix == "all":
query = """
select
decode(table_schema, user, null, table_schema) as owner,
privilege,
table_name as object,
decode(grantor, user, null, grantor) as grantor,
grantee
from
all_tab_privs
where
table_schema = nvl(:owner, user) and
table_name = :name
order by
grantor,
table_name,
privilege,
grantee
"""
else:
query = """
select
decode(owner, user, null, owner) as owner,
privilege,
table_name as object,
decode(grantor, user, null, grantor) as grantor,
grantee
from
dba_tab_privs
where
owner = nvl(:owner, user) and
table_name = :name
order by
grantor,
table_name,
privilege,
grantee
"""
cursor.execute(query, name=self.name, owner=self.owner)
for rec in cursor:
yield Privilege(rec.privilege, rec.object, rec.grantor, rec.grantee, rec.owner, connection)
[docs]
class Sequence(MixinNormalDates, OwnedSchemaObject):
"""
Models a sequence in the database.
"""
type = "sequence"
def _createsql(self, connection, term, copyvalue):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
*
from
{ddprefix}_sequences
where
sequence_owner = nvl(:owner, user) and
sequence_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
code = f"create sequence {self.getfullname()}\n"
code += f"\tincrement by {rec.increment_by}\n"
if copyvalue:
code += f"\tstart with {rec.last_number + rec.increment_by}\n"
else:
code += f"\tstart with {rec.min_value}\n"
code += f"\tmaxvalue {rec.max_value}\n"
code += f"\tminvalue {rec.min_value}\n"
cycle = "" if rec.cycle_flag == "Y" else "no"
code += f"\t{cycle}cycle\n"
if rec.cache_size:
code += f"\tcache {rec.cache_size}\n"
else:
code += "\tnocache\n"
order = "" if rec.order_flag == "Y" else "no"
code += f"\t{order}order"
if term:
code += ";\n"
else:
code += "\n"
return code
def exists(self, connection=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
1
from
{ddprefix}_sequences
where
sequence_owner = nvl(:owner, user) and
sequence_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
return rec is not None
def createsql(self, connection=None, term=True):
return self._createsql(connection, term, False)
[docs]
def createsqlcopy(self, connection=None, term=True):
"""
Return SQL code to create an identical copy of this sequence.
"""
return self._createsql(connection, term, True)
def dropsql(self, connection=None, term=True):
code = f"drop sequence {self.getfullname()}"
if term:
code += ";\n"
else:
code += "\n"
return code
def fixname(self, code):
code = code.split(None, 3)
code = f"create sequence {self.getfullname()}\n{code[3]}"
return code
def references(self, connection=None, dependencies=None):
# Shortcut: a sequence doesn't depend on anything
yield from ()
def _columntype(rec, data_precision=None, data_scale=None, char_length=None):
ftype = rec.data_type.lower()
if data_precision is None:
data_precision = rec.data_precision
if data_scale is None:
data_scale = rec.data_scale
if char_length is None:
char_length = rec.char_length
fsize = data_precision
fprec = data_scale
if ftype == "number" and fprec == 0 and fsize is None:
ftype = "integer"
elif ftype == "number" and fprec is None and fsize is None:
ftype = "number"
elif ftype == "number" and fprec == 0:
ftype = f"number({fsize})"
elif ftype == "number":
ftype = f"number({fsize}, {fprec})"
elif ftype == "raw":
ftype = f"raw({rec.data_length})"
else:
if char_length != 0:
fsize = char_length
if fsize is not None:
ftype += f"({fsize}"
if rec.char_used == "B":
ftype += " byte"
elif rec.char_used == "C":
ftype += " char"
if fprec is not None:
ftype += f", {fprec}"
ftype += ")"
return ftype
def _columndefault(rec):
if rec.data_default is not None and rec.data_default.strip() != "null":
return rec.data_default.strip()
return "null"
[docs]
class Table(MixinNormalDates, OwnedSchemaObject):
"""
Models a table in the database.
"""
type = "table"
def createsql(self, connection=None, term=True):
(connection, cursor) = self.getcursor(connection)
if self.ismview(connection):
return ""
ddprefix = cursor.ddprefix()
# Find the fields that where used for an inline primary key constraint,
# as we want to regenerate it as part of the create table statement
query = f"""
select
column_name
from
{ddprefix}_constraints c,
{ddprefix}_cons_columns cc
where
c.constraint_type = 'P' and
c.generated = 'GENERATED NAME' and
c.owner = nvl(:owner, user) and
c.table_name = :name and
c.constraint_name = cc.constraint_name
"""
cursor.execute(query, owner=self.owner, name=self.name)
_inlinepkfields = {rec.column_name for rec in cursor}
(organization, logging, compression) = self._info(connection)
query = f"""
select
c.column_name,
c.data_type,
c.data_precision,
c.data_scale,
c.char_length,
c.char_used,
c.data_default,
c.nullable,
decode(l.compression, 'NO', null, 'NONE', null, l.compression) as compression
from
{ddprefix}_tab_columns c,
{ddprefix}_lobs l
where
c.owner = nvl(:owner, user) and
c.table_name = :name and
c.owner = l.owner(+) and
c.table_name = l.table_name(+) and
c.column_name = l.column_name(+)
order by
column_id asc
"""
cursor.execute(query, owner=self.owner, name=self.name)
recs = cursor.fetchall()
code = [f"create table {self.getfullname()}\n(\n"]
for (i, rec) in enumerate(recs):
if i:
code.append(",\n")
code.append(f"\t{getfullname(rec.column_name, None)} {_columntype(rec)}")
default = _columndefault(rec)
if default != "null":
code.append(f" default {default}")
if rec.nullable == "N":
code.append(" not null")
if rec.column_name in _inlinepkfields:
code.append(" primary key")
code.append("\n)")
if not logging:
code.append(" nologging")
if compression is not None:
if compression == "BASIC":
code.append(" compress")
elif compression == "ADVANCED":
code.append(" row store compress advanced")
else:
code.append(f"column store compress for {compression}")
for rec in recs:
if rec.compression:
code.append(f"\nlob({rec.column_name}) store as securefile (compress {rec.compression.lower()})")
if term:
code.append(";")
code.append("\n")
return "".join(code)
def exists(self, connection=None):
(connection, cursor) = self.getcursor(connection)
if self.ismview(connection):
return False
ddprefix = cursor.ddprefix()
query = f"""
select
1
from
{ddprefix}_tables
where
owner = nvl(:owner, user) and
table_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
return rec is not None
def dropsql(self, connection=None, term=True):
if self.ismview(connection):
return ""
code = f"drop table {self.getfullname()}"
if term:
code += ";\n"
else:
code += "\n"
return code
def fixname(self, code):
code = code.split(None, 3)
code = f"create table {self.getfullname()}\n{code[3]}"
return code
[docs]
def mview(self, connection=None):
"""
The materialized view this table belongs to (or :const:`None` if it's a
real table).
"""
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
mview_name
from
{ddprefix}_mviews
where
owner = nvl(:owner, user) and
mview_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
if rec is not None:
rec = MaterializedView(self.name, self.owner, connection)
return rec
[docs]
def ismview(self, connection=None):
"""
Is this table a materialized view?
"""
return self.mview(connection) is not None
def _info(self, connection=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
logging,
iot_type,
compress_for
from
{ddprefix}_tables
where
owner = nvl(:owner, user) and
table_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
return ("heap" if rec.iot_type is None else "index", rec.logging == "YES", rec.compress_for)
[docs]
def organization(self, connection=None):
"""
Return the organization of this table: either ``"heap"`` (for "normal"
tables) or ``"index"`` (for index organized tables).
"""
return self._info(connection)[0]
[docs]
def logging(self, connection=None):
"""
Return whether the table is in logging mode or not.
"""
return self._info(connection)[1]
[docs]
def compression(self, connection=None):
"""
Return the compression mode of the table.
(``None``, ``"BASIC"`` or ``"ADVANCED"``).
"""
return self._info(connection)[2]
@classmethod
def names(cls, connection, owner=None):
# Skip tables that are materialized views
cursor = connection.cursor()
ddprefix = cursor.ddprefix()
if owner is None:
query = """
select
null as owner,
table_name
from
user_tables
where
table_name not like 'BIN$%' and
table_name not like 'DR$%'
minus
select
null as owner,
mview_name as table_name
from
user_mviews
order by
table_name
"""
cursor.execute(query)
elif owner is ALL:
query = f"""
select
decode(owner, user, null, owner) as owner,
table_name
from
{ddprefix}_tables
where
table_name not like 'BIN$%' and
table_name not like 'DR$%'
minus
select decode(owner, user, null, owner) as owner,
mview_name as table_name
from
{ddprefix}_mviews
order by
owner,
table_name
"""
cursor.execute(query)
elif isinstance(owner, str):
query = f"""
select
decode(owner, user, null, owner) as owner,
table_name
from
{ddprefix}_tables
where
table_name not like 'BIN$%' and
table_name not like 'DR$%'
minus
select decode(owner, user, null, owner) as owner,
mview_name as table_name
from
{ddprefix}_mviews
order by
owner,
table_name
"""
cursor.execute(query, owner=owner)
else:
query = f"""
select
decode(owner, user, null, owner) as owner,
table_name
from
{ddprefix}_tables
where
table_name not like 'BIN$%' and
table_name not like 'DR$%' and
owner in ({', '.join(sqlstr(o) for o in owner)})
minus
select
decode(owner, user, null, owner) as owner,
mview_name as table_name
from
{ddprefix}_mviews
where
owner in ({', '.join(sqlstr(o) for o in owner)})
order by
owner,
table_name
"""
cursor.execute(query)
for row in cursor:
yield (row.table_name, row.owner)
[docs]
def columns(self, connection=None):
"""
Generator that yields all column objects of this table.
"""
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
column_name
from
{ddprefix}_tab_columns
where
owner=nvl(:owner, user) and
table_name=:name
order by
column_id
"""
cursor.execute(query, owner=self.owner, name=self.name)
for rec in cursor:
yield Column(f"{self.name}.{rec.column_name}", self.owner, connection)
[docs]
def records(self, connection=None):
"""
Generator that yields all records of this table.
"""
(connection, cursor) = self.getcursor(connection)
query = f"select * from {self.getfullname()}"
cursor.execute(query)
return iter(cursor)
def _iterconstraints(self, connection, cond):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
decode(owner, user, null, owner) as owner,
constraint_type,
constraint_name
from
{ddprefix}_constraints
where
generated = 'USER NAME' and
constraint_type {cond} and
owner = nvl(:owner, user) and
table_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
types = {"P": PrimaryKey, "U": UniqueConstraint, "R": ForeignKey, "C": CheckConstraint}
for rec in cursor:
yield types[rec.constraint_type](rec.constraint_name, rec.owner, connection)
[docs]
def constraints(self, connection=None):
"""
Generator that yields all constraints for this table.
"""
return self._iterconstraints(connection, "in ('P', 'U', 'R', 'C')")
[docs]
def pk(self, connection=None):
"""
Return the primary key constraint for this table (or :const:`None` if the
table has no primary key constraint).
"""
return misc.first(self._iterconstraints(connection, "= 'P'"), None)
[docs]
def fks(self, connection=None):
"""
Return the foreign key constraints for this table.
"""
return self._iterconstraints(connection, "= 'R'")
[docs]
def uniques(self, connection=None):
"""
Return the unique constraints for this table.
"""
return self._iterconstraints(connection, "= 'U'")
[docs]
def checks(self, connection=None):
"""
Return the unique constraints for this table.
"""
return self._iterconstraints(connection, "= 'C'")
def references(self, connection=None, dependencies=None):
connection = self.getconnection(connection)
# A table doesn't depend on anything ...
mview = self.mview(connection)
if mview is not None:
# ... unless it was created by a materialized view, in which case it depends on the view
yield mview
def referencedby(self, connection=None, dependencies=None):
if not self.ismview(connection):
yield self.comment(connection)
yield from self.comments(connection)
yield from self.constraints(connection)
for obj in super().referencedby(connection):
# skip the materialized view
if not isinstance(obj, MaterializedView) or obj.name != self.name or obj.owner != self.owner:
yield obj
[docs]
class Constraint(OwnedSchemaObject):
"""
Base class of all constraints (primary key constraints, foreign key
constraints, unique constraints and check constraints).
"""
def exists(self, connection=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
1
from
{ddprefix}_constraints
where
constraint_type = :type and
constraint_name = :name and
owner = nvl(:owner, user)
"""
cursor.execute(query, type=self.constraint_type, name=self.name, owner=self.owner)
rec = cursor.fetchone()
return rec is not None
def cdate(self, connection=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
sys_extract_utc(from_tz(cast(last_change as timestamp), dbtimezone))
from
{ddprefix}_constraints
where
constraint_type = :type and
constraint_name = :name and
owner = nvl(:owner, user)
"""
cursor.execute(query, type=self.constraint_type, name=self.name, owner=self.owner)
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
return None # we can't give a create date, only a change date, so return :const:`None` here
def udate(self, connection=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
sys_extract_utc(from_tz(cast(last_change as timestamp), dbtimezone))
from
{ddprefix}_constraints
where
constraint_type=:type and
constraint_name=:name and
owner=nvl(:owner, user)
"""
cursor.execute(query, type=self.constraint_type, name=self.name, owner=self.owner)
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
return rec[0]
def _sql(self, connection, term, command):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
table_name
from
{ddprefix}_constraints
where
constraint_type = :type and
owner = nvl(:owner, user) and
constraint_name = :name
"""
cursor.execute(query, type=self.constraint_type, owner=self.owner, name=self.name)
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
tablename = getfullname(rec.table_name, self.owner)
checkname = getfullname(self.name, None)
code = f"alter table {tablename} {command} constraint {checkname}"
if term:
code += ";\n"
else:
code += "\n"
return code
def dropsql(self, connection=None, term=True):
return self._sql(connection, term, "drop")
def enablesql(self, connection=None, term=True):
return self._sql(connection, term, "enable")
def disablesql(self, connection=None, term=True):
return self._sql(connection, term, "disable")
[docs]
def isenabled(self, connection=None):
"""
Return whether this constraint is enabled.
"""
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
status
from
{ddprefix}_constraints
where
constraint_type = :type and
owner = nvl(:owner, user) and
constraint_name = :name
"""
cursor.execute(query, type=self.constraint_type, owner=self.owner, name=self.name)
rec = cursor.fetchone()
return rec[0] == "ENABLED"
@classmethod
def names(cls, connection, owner=None):
cursor = connection.cursor()
ddprefix = cursor.ddprefix()
if owner is None:
query = """
select
null as owner,
constraint_name
from
user_constraints
where
generated = 'USER NAME' and
constraint_type = :type and
constraint_name not like 'BIN$%'
order by
constraint_name
"""
cursor.execute(query, type=cls.constraint_type)
elif owner is ALL:
query = f"""
select
decode(owner, user, null, owner) as owner,
constraint_name
from
{ddprefix}_constraints
where
generated = 'USER NAME' and
constraint_type = :type and
constraint_name not like 'BIN$%'
order by
owner,
constraint_name
"""
cursor.execute(query, type=cls.constraint_type)
elif isinstance(owner, str):
query = f"""
select
decode(owner, user, null, owner) as owner,
constraint_name
from
{ddprefix}_constraints
where
generated = 'USER NAME' and
constraint_type = :type and
constraint_name not like 'BIN$%' and
owner = :owner
order by
owner,
constraint_name
"""
cursor.execute(query, type=cls.constraint_type, owner=owner)
else:
query = f"""
select
decode(owner, user, null, owner) as owner,
constraint_name
from
{ddprefix}_constraints
where
generated = 'USER NAME' and
constraint_type = :type and
constraint_name not like 'BIN$%' and
owner in ({', '.join(sqlstr(o) for o in owner)})
order by
owner,
constraint_name
"""
cursor.execute(query, type=cls.constraint_type)
for rec in cursor:
yield (rec.constraint_name, rec.owner)
def fixname(self, code):
code = code.split(None, 6)
code = f"alter table {code[2]} add constraint {self.getfullname()} {code[6]}"
return code
[docs]
def table(self, connection=None):
"""
Return the :class:`Table` ``self`` belongs to.
"""
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
table_name
from
{ddprefix}_constraints
where
constraint_type=:type and
owner=nvl(:owner, user) and
constraint_name=:name
"""
cursor.execute(query, type=self.constraint_type, owner=self.owner, name=self.name)
rec = cursor.fetchone()
return Table(rec.table_name, self.owner, connection)
[docs]
class PrimaryKey(Constraint):
"""
Models a primary key constraint in the database.
"""
type = "pk"
constraint_type = "P"
[docs]
def columns(self, connection=None):
"""
Return an iterator over the columns this primary key consists of.
"""
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
decode(owner, user, null, owner) as owner,
constraint_name,
table_name,
r_owner,
r_constraint_name
from
{ddprefix}_constraints
where
constraint_type = 'P' and
owner = nvl(:owner, user) and
constraint_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec2 = cursor.fetchone()
if rec2 is None:
raise SQLObjectNotFoundError(self)
tablename = getfullname(rec2.table_name, rec2.owner)
query = f"""
select
column_name
from
{ddprefix}_cons_columns
where
owner=nvl(:owner, user) and
constraint_name=:name
order by
position
"""
cursor.execute(query, owner=self.owner, name=self.name)
for rec in cursor:
yield Column(f"{tablename}.{rec.column_name}", self.owner, connection)
def createsql(self, connection=None, term=True):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
decode(owner, user, null, owner) as owner,
constraint_name,
table_name,
r_owner,
r_constraint_name
from
{ddprefix}_constraints
where
constraint_type = 'P' and
owner = nvl(:owner, user) and
constraint_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec2 = cursor.fetchone()
if rec2 is None:
raise SQLObjectNotFoundError(self)
query = f"""
select
column_name
from
{ddprefix}_cons_columns
where
owner=nvl(:owner, user) and
constraint_name=:name
order by
position
"""
cursor.execute(query, owner=self.owner, name=self.name)
tablename = getfullname(rec2.table_name, rec2.owner)
pkname = getfullname(self.name, None)
fields = ", ".join(r.column_name for r in cursor)
code = f"alter table {tablename} add constraint {pkname} primary key({fields})"
if term:
code += ";\n"
else:
code += "\n"
return code
def referencedby(self, connection=None, dependencies=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
decode(owner, user, null, owner) as owner,
constraint_name
from
{ddprefix}_constraints
where
constraint_type='R' and
r_owner=nvl(:owner, user) and
r_constraint_name=:name
"""
cursor.execute(query, owner=self.owner, name=self.name)
for rec in cursor.fetchall():
yield ForeignKey(rec.constraint_name, rec.owner, connection)
# Normally there is an index for this primary key, but we ignore it, as for the purpose of :mod:`orasql` this index doesn't exist
def references(self, connection=None, dependencies=None):
yield self.table(connection)
[docs]
class ForeignKey(Constraint):
"""
Models a foreign key constraint in the database.
"""
type = "fk"
constraint_type = "R"
def createsql(self, connection=None, term=True):
(connection, cursor) = self.getcursor(connection)
# Add constraint_type to the query, so we don't pick up another constraint by accident
ddprefix = cursor.ddprefix()
query = f"""
select
decode(r_owner, user, null, r_owner) as r_owner,
r_constraint_name,
table_name
from
{ddprefix}_constraints
where
constraint_type = 'R' and
owner = nvl(:owner, user) and
constraint_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
query = f"""
select
column_name
from
{ddprefix}_cons_columns
where
owner=nvl(:owner, user) and
constraint_name=:name
order by
position
"""
cursor.execute(query, owner=self.owner, name=self.name)
fields1 = ", ".join(r.column_name for r in cursor)
query = f"""
select
table_name,
column_name
from
{ddprefix}_cons_columns
where
owner=nvl(:owner, user) and
constraint_name=:name
order by
position
"""
cursor.execute(query, owner=rec.r_owner, name=rec.r_constraint_name)
fields2 = ", ".join(f"{getfullname(r.table_name, rec.r_owner)}({r.column_name})" for r in cursor)
tablename = getfullname(rec.table_name, self.owner)
fkname = getfullname(self.name, None)
code = f"alter table {tablename} add constraint {fkname} foreign key ({fields1}) references {fields2}"
if term:
code += ";\n"
else:
code += "\n"
return code
def referencedby(self, connection=None, dependencies=None):
# Shortcut: Nobody references a foreign key
yield from ()
def references(self, connection=None, dependencies=None):
yield self.table(connection)
yield self.refconstraint(connection)
[docs]
def refconstraint(self, connection=None):
"""
Return the constraint referenced by ``self``.
In most cases this is a :class:`PrimaryKey`, but it also might be a
:class:`UniqueConstraint`.
"""
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
c2.constraint_type,
decode(c1.r_owner, user, null, c1.r_owner) as r_owner,
c1.r_constraint_name
from
{ddprefix}_constraints c1,
{ddprefix}_constraints c2
where
c1.constraint_type = 'R' and
c1.owner = nvl(:owner, user) and
c1.constraint_name = :name and
c1.r_owner = c2.owner and
c1.r_constraint_name = c2.constraint_name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
types = {"P": PrimaryKey, "U": UniqueConstraint}
return types[rec.constraint_type](rec.r_constraint_name, rec.r_owner, connection)
[docs]
def columns(self, connection=None):
"""
Return an iterator over the columns this foreign key consists of.
"""
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
decode(owner, user, null, owner) as owner,
table_name,
column_name
from
{ddprefix}_cons_columns
where
constraint_name=:name and
owner=nvl(:owner, user)
order by
position
"""
cursor.execute(query, owner=self.owner, name=self.name)
for r in cursor:
yield Column(f"{r.table_name}.{r.column_name}", r.owner, connection)
[docs]
class UniqueConstraint(Constraint):
"""
Models a unique constraint in the database.
"""
type = "unique"
constraint_type = "U"
def createsql(self, connection=None, term=True):
(connection, cursor) = self.getcursor(connection)
# Add constraint_type to the query, so we don't pick up another constraint by accident
ddprefix = cursor.ddprefix()
query = f"""
select
table_name
from
{ddprefix}_constraints
where
constraint_type='U' and
owner=nvl(:owner, user) and
constraint_name=:name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
tablename = getfullname(rec.table_name, self.owner)
uniquename = getfullname(self.name, None)
query = """
select
column_name
from
all_cons_columns
where
owner = nvl(:owner, user) and
constraint_name = :name
order by
position
"""
cursor.execute(query, owner=self.owner, name=self.name)
fields = ", ".join(r.column_name for r in cursor)
code = f"alter table {tablename} add constraint {uniquename} unique({fields})"
if term:
code += ";\n"
else:
code += "\n"
return code
def referencedby(self, connection=None, dependencies=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
decode(owner, user, null, owner) as owner,
constraint_name
from
{ddprefix}_constraints
where
constraint_type = 'R' and
r_owner = nvl(:owner, user) and
r_constraint_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
for rec in cursor.fetchall():
yield ForeignKey(rec.constraint_name, rec.owner, connection)
# Normally there is an index for this constraint, but we ignore it, as for the purpose of :mod:`orasql` this index doesn't exist
def references(self, connection=None, dependencies=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
decode(owner, user, null, owner) as owner,
table_name
from
{ddprefix}_constraints
where
constraint_type = 'U' and
owner = nvl(:owner, user) and
constraint_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
for rec in cursor.fetchall():
yield Table(rec.table_name, rec.owner, connection)
[docs]
class CheckConstraint(Constraint):
"""
Models a check constraint in the database.
"""
type = "check"
constraint_type = "C"
def createsql(self, connection=None, term=True):
(connection, cursor) = self.getcursor(connection)
# Add constraint_type to the query, so we don't pick up another constraint by accident
ddprefix = cursor.ddprefix()
query = f"""
select
table_name,
search_condition
from
{ddprefix}_constraints
where
constraint_type = 'C' and
owner = nvl(:owner, user) and
constraint_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
tablename = getfullname(rec.table_name, self.owner)
checkname = getfullname(self.name, None)
code = f"alter table {tablename} add constraint {checkname} check ({rec.search_condition})"
if term:
code += ";\n"
else:
code += "\n"
return code
def referencedby(self, connection=None, dependencies=None):
# Shortcut: Nobody references a check constraint
yield from ()
def references(self, connection=None, dependencies=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
decode(owner, user, null, owner) as owner,
table_name
from
{ddprefix}_constraints
where
constraint_type = 'C' and
owner = nvl(:owner, user) and
constraint_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
for rec in cursor.fetchall():
yield Table(rec.table_name, rec.owner, connection)
[docs]
class Index(MixinNormalDates, OwnedSchemaObject):
"""
Models an index in the database.
"""
type = "index"
def exists(self, connection=None):
(connection, cursor) = self.getcursor(connection)
if self.isconstraint(connection):
return False
ddprefix = cursor.ddprefix()
query = f"""
select
1
from
{ddprefix}_indexes
where
owner = nvl(:owner, user) and
index_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
return rec is not None
def createsql(self, connection=None, term=True):
(connection, cursor) = self.getcursor(connection)
if self.isconstraint(connection):
return ""
ddprefix = cursor.ddprefix()
query = f"""
select
index_name,
table_name,
uniqueness,
index_type,
logging,
ityp_owner,
ityp_name,
parameters
from
{ddprefix}_indexes
where
owner = nvl(:owner, user) and
index_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
tablename = getfullname(rec.table_name, self.owner)
indexname = self.getfullname()
unique = " unique" if rec.uniqueness == "UNIQUE" else ""
logging = " nologging" if rec.logging=="NO" else ""
query = f"""
select
aie.column_expression,
aic.column_name
from
{ddprefix}_ind_columns aic,
{ddprefix}_ind_expressions aie
where
aic.index_owner = aie.index_owner(+) and
aic.index_name = aie.index_name(+) and
aic.column_position = aie.column_position(+) and
aic.index_owner = nvl(:owner, user) and
aic.index_name = :name
order by
aic.column_position
"""
cursor.execute(query, owner=self.owner, name=self.name)
fields = ", ".join(r.column_expression or r.column_name for r in cursor)
code = f"create{unique} index {indexname} on {tablename} ({fields}){logging}"
if rec.index_type == "DOMAIN":
if rec.parameters:
parameters = f" parameters ({sqlliteral(rec.parameters)})"
else:
parameters = ""
code += f" indextype is {rec.ityp_owner}.{rec.ityp_name}{parameters}"
if term:
code += ";\n"
else:
code += "\n"
return code
def dropsql(self, connection=None, term=True):
if self.isconstraint(connection):
return ""
code = f"drop index {self.getfullname()}"
if term:
code += ";\n"
else:
code += "\n"
return code
[docs]
def rebuildsql(self, connection=None, term=True):
"""
Return SQL code to rebuild this index.
"""
if self.isconstraint(connection):
return ""
code = f"alter index {self.getfullname()} rebuild"
if term:
code += ";\n"
else:
code += "\n"
return code
@classmethod
def names(cls, connection, owner=None):
# We skip those indexes that are generated by a constraint
cursor = connection.cursor()
ddprefix = cursor.ddprefix()
if owner is None:
query = """
select
null as owner,
index_name
from
(
select
index_name
from
user_indexes
where
index_type not in ('LOB', 'IOT - TOP')
minus
select
index_name
from
user_constraints
where
constraint_type in ('U', 'P') and
owner = user
)
where
index_name not like 'BIN$%'
order by
index_name
"""
cursor.execute(query)
elif owner is ALL:
query = f"""
select
decode(owner, user, null, owner) as owner,
index_name
from
(
select
owner,
index_name
from
{ddprefix}_indexes
where
index_type not in ('LOB', 'IOT - TOP')
minus
select
index_owner,
index_name
from
{ddprefix}_constraints
where
constraint_type in ('U', 'P')
)
where
index_name not like 'BIN$%'
order by
owner,
index_name
"""
cursor.execute(query)
elif isinstance(owner, str):
query = f"""
select
decode(owner, user, null, owner) as owner,
index_name
from
(
select
owner,
index_name
from
{ddprefix}_indexes
where
index_type not in ('LOB', 'IOT - TOP') and
owner = :owner
minus
select
index_owner,
index_name
from
{ddprefix}_constraints
where
constraint_type in ('U', 'P') and
index_owner = :owner
)
where
index_name not like 'BIN$%'
order by
owner,
index_name
"""
cursor.execute(query, owner=owner)
else:
query = f"""
select
decode(owner, user, null, owner) as owner,
index_name
from
(
select
owner,
index_name
from
{ddprefix}_indexes
where
index_type not in ('LOB', 'IOT - TOP') and
owner in ({', '.join(sqlstr(o) for o in owner)})
minus
select
index_owner,
index_name
from
{ddprefix}_constraints
where
constraint_type in ('U', 'P') and
index_owner in ({', '.join(sqlstr(o) for o in owner)})
)
where
index_name not like 'BIN$%'
order by
owner,
index_name
"""
cursor.execute(query)
for row in cursor:
yield (row.index_name, row.owner)
def fixname(self, code):
if code.lower().startswith("create unique"):
code = code.split(None, 5)
code = f"create unique index {self.getfullname()} {code[5]}"
else:
code = code.split(None, 4)
code = f"create index {self.getfullname()} {code[4]}"
return code
[docs]
def constraint(self, connection=None):
"""
If this index is generated by a constraint, return the constraint
otherwise return :const:`None`.
"""
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
constraint_type
from
{ddprefix}_constraints
where
owner = nvl(:owner, user) and
constraint_name = :name and
constraint_type in ('U', 'P')
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
if rec is not None:
rec = {"U": UniqueConstraint, "P": PrimaryKey}[rec.constraint_type](self.name, self.owner, connection)
return rec
[docs]
def isconstraint(self, connection=None):
"""
Is this index generated by a constraint?
"""
return self.constraint(connection) is not None
def references(self, connection=None, dependencies=None):
constraint = self.constraint(connection)
# if self is generated by a constraint (i.e. ``constraint`` is not :const:`None`), we ignore all dependencies (such an index is never produced be :meth:`objects`)
if constraint is None:
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
# If this is a domain index, reference the preferences defined there
query = f"""
select
index_type,
parameters
from
{ddprefix}_indexes
where
owner = nvl(:owner, user) and
index_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
if rec.index_type == "DOMAIN":
parameters = re.split('\\b(datastore|memory|lexer|stoplist|wordlist)\\b', rec.parameters, flags=re.IGNORECASE)
foundparameter = None
for parameter in parameters:
if foundparameter:
if foundparameter.lower() in ("datastore", "lexer", "stoplist", "wordlist"):
(prefowner, sep, prefname) = parameter.strip().partition(".")
if sep:
yield Preference(prefname.upper(), prefowner)
else:
yield Preference(prefowner.upper())
foundparameter = None
elif parameter.lower() in ("datastore", "lexer", "stoplist", "wordlist"):
foundparameter = parameter
yield from super().references(connection)
[docs]
def table(self, connection=None):
"""
Return the :class:`Table` ``self`` belongs to.
"""
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
table_name,
decode(table_owner, user, null, table_owner) as table_owner
from
{ddprefix}_indexes
where
owner=nvl(:owner, user) and
index_name=:name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
return Table(rec.table_name, rec.table_owner, connection)
[docs]
def columns(self, connection=None):
"""
Return an iterator over the columns this index consists of.
"""
(connection, cursor) = self.getcursor(connection)
table = self.table(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
aie.column_expression,
aic.column_name
from
{ddprefix}_ind_columns aic,
{ddprefix}_ind_expressions aie
where
aic.index_owner = aie.index_owner(+) and
aic.index_name = aie.index_name(+) and
aic.column_position = aie.column_position(+) and
aic.index_owner = nvl(:owner, user) and
aic.index_name = :name
order by
aic.column_position
"""
cursor.execute(query, owner=self.owner, name=self.name)
for rec in cursor:
if rec.column_expression is not None:
raise TypeError(f"{self!r} contains an index expression")
yield Column(f"{table.name}.{rec.column_name}", owner=table.owner)
[docs]
class Synonym(OwnedSchemaObject):
"""
Models a synonym in the database.
"""
type = "synonym"
def exists(self, connection=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
1
from
{ddprefix}_synonyms
where
owner=nvl(:owner, user) and
synonym_name=:name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
return rec is not None
def createsql(self, connection=None, term=True):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
decode(table_owner, user, null, table_owner) as table_owner,
table_name,
db_link
from
{ddprefix}_synonyms
where
owner = nvl(:owner, user) and
synonym_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
owner = self.owner
if owner == "PUBLIC":
public = "public "
owner = None
else:
public = ""
name = getfullname(self.name, owner)
name2 = getfullname(rec.table_name, rec.table_owner)
code = f"create or replace {public}synonym {name} for {name2}"
if rec.db_link is not None:
code += f"@{rec.db_link}"
if term:
code += ";\n"
else:
code += "\n"
return code
def dropsql(self, connection=None, term=True):
owner = self.owner
if owner == "PUBLIC":
public = "public "
owner = None
else:
public = ""
name = getfullname(self.name, owner)
code = f"drop {public}synonym {name}"
if term:
code += ";\n"
else:
code += "\n"
return code
def fixname(self, code):
if code.lower().startswith("create or replace public"):
code = code.split(None, 6)
code = f"create or replace public synonym {self.getfullname()} {code[6]}"
else:
code = code.split(None, 5)
code = f"create or replace synonym {self.getfullname()} {code[5]}"
return code
[docs]
def object(self, connection=None):
"""
Return the object for which ``self`` is a synonym.
"""
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
table_owner,
table_name,
db_link
from
{ddprefix}_synonyms
where
owner=nvl(:owner, user) and
synonym_name=:name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
return connection._getobject(rec.table_name, rec.table_owner)
[docs]
@classmethod
def names(cls, connection, owner=None, object_owner=ALL):
"""
Generator that yields the names of all synonyms. For the meaning of
``owner`` and ``object_owner`` see :meth:`objects`.
Names will be in ascending order.
"""
cursor = connection.cursor()
ddprefix = cursor.ddprefix()
where_sql = []
params = {}
if owner is None:
view_sql = "user_synonyms"
owner_sql = "null as owner"
order_sql = "table_owner, synonym_name"
else:
view_sql = f"{ddprefix}_synonyms"
owner_sql = "decode(owner, user, null, owner) as owner"
order_sql = "owner, table_owner, synonym_name"
if owner is ALL:
pass
elif isinstance(owner, str):
where_sql.append("owner = :owner")
params["owner"] = owner
else:
where_sql.append(f"owner in ({', '.join(sqlstr(o) for o in owner)})")
if object_owner is None:
where_sql.append("table_owner = user")
elif object_owner is ALL:
pass
elif isinstance(object_owner, str):
where_sql.append("table_owner = :table_owner")
params["table_owner"] = object_owner
else:
where_sql.append(f"table_owner in ({', '.join(sqlstr(o) for o in object_owner)})")
query = f"select {owner_sql}, synonym_name from {view_sql}"
if where_sql:
query += f" where {' and '.join(where_sql)}"
query += f" order by {order_sql}"
cursor.execute(query, **params)
for row in cursor:
yield (row.synonym_name, row.owner)
[docs]
@classmethod
def objects(cls, connection, owner=None, object_owner=ALL):
"""
Generator that yields all synonym in the current users schema.
The argument ``owner`` specifies to which owner the synonym must belong
to to be yielded. The argument ``object_owner`` specifies to which owner
the object must belong to to be yielded. For more information see
:func:`owned`.
"""
for name in cls.names(connection, owner, object_owner):
yield cls(name[0], name[1], connection)
[docs]
class View(MixinNormalDates, OwnedSchemaObject):
"""
Models a view in the database.
"""
type = "view"
def exists(self, connection=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
1
from
{ddprefix}_views
where
owner=nvl(:owner, user) and
view_name=:name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
return rec is not None
def createsql(self, connection=None, term=True):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
text
from
{ddprefix}_views
where
owner=nvl(:owner, user) and
view_name=:name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
code = "\n".join(line.rstrip() for line in (rec.text or "").strip().splitlines()) # Strip trailing whitespace
code = f"create or replace force view {self.getfullname()} as\n\t{code}"
if term:
code += "\n/\n"
else:
code += "\n"
return code
def dropsql(self, connection=None, term=True):
code = f"drop view {self.getfullname()}"
if term:
code += ";\n"
else:
code += "\n"
return code
def fixname(self, code):
code = code.split(None, 6)
code = f"create or replace force view {self.getfullname()} {code[6]}"
return code
def records(self, connection=None):
(connection, cursor) = self.getcursor(connection)
query = f"select * from {self.getfullname()}"
cursor.execute(query)
return iter(cursor)
[docs]
class MaterializedView(View):
"""
Models a meterialized view in the database.
"""
type = "materialized view"
def exists(self, connection=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
1
from
{ddprefix}_mviews
where
owner=nvl(:owner, user) and
mview_name=:name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
return rec is not None
def createsql(self, connection=None, term=True):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
*
from
{ddprefix}_mviews
where
owner = nvl(:owner, user) and
mview_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
code = "\n".join(line.rstrip() for line in rec.query.strip().splitlines()) # Strip trailing whitespace
code = f"create materialized view {self.getfullname()}\nrefresh {rec.refresh_method} on {rec.refresh_mode} as\n\t{code}"
if term:
code += "\n/\n"
else:
code += "\n"
return code
def dropsql(self, connection=None, term=True):
code = f"drop materialized view {self.getfullname()}"
if term:
code += ";\n"
else:
code += "\n"
return code
def fixname(self, code):
code = code.split(None, 4)
code = f"create materialized view {self.getfullname()} {code[4]}"
return code
def references(self, connection=None, dependencies=None):
# skip the table
for obj in super().references(connection):
if not isinstance(obj, Table) or obj.name != self.name or obj.owner != self.owner:
yield obj
def referencedby(self, connection=None, dependencies=None):
connection = self.getconnection(connection)
yield Table(self.name, self.owner, connection)
[docs]
class Library(OwnedSchemaObject):
"""
Models a library in the database.
"""
type = "library"
def exists(self, connection=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
file_spec
from
{ddprefix}_libraries
where
owner = nvl(:owner, user) and
library_name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
return rec is not None
def createsql(self, connection=None, term=True):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
file_spec
from
{ddprefix}_libraries
where
owner=nvl(:owner, user) and
library_name=:name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
code = f"create or replace library {self.getfullname()} as {rec.file_spec!r}"
if term:
code += ";\n"
else:
code += "\n"
return code
def dropsql(self, connection=None, term=True):
code = f"drop library {self.getfullname()}"
if term:
code += ";\n"
else:
code += "\n"
return code
def fixname(self, code):
code = code.split(None, 5)
code = f"create or replace library {self.getfullname()} {code[5]}"
return code
[docs]
class Argument:
"""
:class:`Argument` objects hold information about the arguments of a
stored procedure.
"""
def __init__(self, name, position, datatype, isin, isout):
self.name = name
self.position = position
self.datatype = datatype
self.isin = isin
self.isout = isout
def __repr__(self):
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} name={self.name!r} position={self.position!r} datatype={self.datatype!r} at {id(self):#x}>"
[docs]
class Callable(MixinNormalDates, MixinCodeSQL, OwnedSchemaObject):
"""
Models a callable object in the database, i.e. functions and procedures.
"""
_ora2cx = {
"date": datetime.datetime,
"timestamp": datetime.datetime,
"timestamp with time zone": datetime.datetime,
"number": float,
"varchar2": str,
"clob": DB_TYPE_CLOB,
"blob": DB_TYPE_BLOB,
}
def __init__(self, name, owner=None, connection=None):
super().__init__(name, owner, connection)
self._argsbypos = None
self._argsbyname = None
self._returnvalue = None
def _calcargs(self, cursor):
if self._argsbypos is None:
ddprefix = cursor.ddprefix()
if "." in self.name:
(package_name, procedure_name) = self.name.split(".")
query = f"""
select
object_name
from
{ddprefix}_procedures
where
owner = nvl(:owner, user) and
object_name = :package_name and
procedure_name = :procedure_name
"""
cursor.execute(query, owner=self.owner, package_name=package_name, procedure_name=procedure_name)
else:
package_name = None
procedure_name = self.name
query = f"""
select
object_name
from
{ddprefix}_procedures
where
owner = nvl(:owner, user) and
object_name = :name and
procedure_name is null
"""
cursor.execute(query, owner=self.owner, name=procedure_name)
if cursor.fetchone() is None:
raise SQLObjectNotFoundError(self)
self._argsbypos = []
self._argsbyname = {}
if package_name is not None:
query = f"""
select
lower(argument_name) as name,
lower(in_out) as in_out,
lower(data_type) as datatype
from
{ddprefix}_arguments
where
owner = nvl(:owner, user) and
package_name = :package_name and
object_name = :procedure_name and
data_level = 0
order by
sequence
"""
cursor.execute(query, owner=self.owner, package_name=package_name, procedure_name=procedure_name)
else:
query = f"""
select
lower(argument_name) as name,
lower(in_out) as in_out,
lower(data_type) as datatype
from
{ddprefix}_arguments
where
owner = nvl(:owner, user) and
package_name is null and
object_name = :procedure_name and
data_level = 0
order by
sequence
"""
cursor.execute(query, owner=self.owner, procedure_name=procedure_name)
i = 0 # argument position (skip return value)
for record in cursor:
arginfo = Argument(record.name, i, record.datatype, "in" in record.in_out, "out" in record.in_out)
if record.name is None: # this is the return value
self._returnvalue = arginfo
else:
self._argsbypos.append(arginfo)
self._argsbyname[arginfo.name] = arginfo
i += 1
def _getargs(self, cursor, *args, **kwargs):
queryargs = {}
if len(args) > len(self._argsbypos):
raise TypeError(f"too many parameters for {self!r}: {len(args):,} given, {len(self._argsbypos):,} expected")
# Handle positional arguments
for (arg, arginfo) in zip(args, self._argsbypos):
queryargs[arginfo.name] = self._wraparg(cursor, arginfo, arg)
# Handle keyword arguments
for (argname, arg) in kwargs.items():
argname = argname.lower()
if argname in queryargs:
raise TypeError(f"duplicate argument for {self!r}: {argname}")
try:
arginfo = self._argsbyname[argname]
except KeyError:
raise TypeError(f"unknown parameter for {self!r}: {argname}")
queryargs[arginfo.name] = self._wraparg(cursor, arginfo, arg)
# Add out parameters for anything that hasn't been specified
for arginfo in self._argsbypos:
if arginfo.name not in queryargs and arginfo.isout:
queryargs[arginfo.name] = self._wraparg(cursor, arginfo, None)
return queryargs
def _wraparg(self, cursor, arginfo, arg):
try:
if arg is None:
t = self._ora2cx[arginfo.datatype]
else:
t = arg.type if isinstance(arg, Object) else type(arg)
except KeyError:
raise TypeError(f"can't handle parameter {arginfo.name} of type {arginfo.datatype} with value {arg!r} in {self!r}")
if isinstance(arg, bytes): # ``bytes`` is treated as binary data, always wrap it in a ``BLOB``
t = DB_TYPE_BLOB
elif isinstance(arg, str) and len(arg) >= 2000:
t = DB_TYPE_CLOB
var = cursor.var(t)
var.setvalue(0, arg)
return var
def _unwraparg(self, arginfo, cursor, value):
if isinstance(value, LOB):
value = _decodelob(value, cursor.readlobs)
return value
def _makerecord(self, cursor, args):
index2name = []
values = []
for arginfo in self._argsbypos:
name = arginfo.name
if name in args:
index2name.append(name)
values.append(self._unwraparg(arginfo, cursor, args[name].getvalue(0)))
name2index = dict(zip(index2name, itertools.count()))
return Record(index2name, name2index, values)
[docs]
def arguments(self, connection=None):
"""
Generator that yields all arguments of the function/procedure ``self``.
"""
(connection, cursor) = self.getcursor(connection)
self._calcargs(cursor)
yield from self._argsbypos
def synonyms(self, connection=None):
if "." not in self.name:
yield from super().synonyms(connection)
else:
yield from ()
def privileges(self, connection=None):
if "." not in self.name:
yield from super().privileges(connection)
else:
yield from ()
[docs]
class Procedure(Callable):
"""
Models a procedure in the database. A :class:`Procedure` object can be
used as a wrapper for calling the procedure with keyword arguments.
"""
type = "procedure"
[docs]
def __call__(self, cursor, *args, **kwargs):
"""
Call the procedure with arguments ``args`` and keyword arguments
``kwargs``. ``cursor`` must be a :class:`ll.orasql.Cursor` object.
This will return a :class:`Record` object containing the result of the
call (i.e. this record will contain all specified and all out parameters).
"""
self._calcargs(cursor)
if self.owner is None:
name = self.name
else:
name = f"{self.owner}.{self.name}"
queryargs = self._getargs(cursor, *args, **kwargs)
queryargstring = ", ".join(f"{name}=>:{name}" for name in queryargs)
query = f"begin {name}({queryargstring}); end;"
cursor.execute(query, queryargs)
return self._makerecord(cursor, queryargs)
[docs]
class Function(Callable):
"""
Models a function in the database. A :class:`Function` object can be
used as a wrapper for calling the function with keyword arguments.
"""
type = "function"
[docs]
def __call__(self, cursor, *args, **kwargs):
"""
Call the function with arguments ``args`` and keyword arguments
``kwargs``. ``cursor`` must be an :class:`ll.orasql.Cursor` object.
This will return a tuple containing the result and a :class:`Record`
object containing the modified parameters (i.e. this record will contain
all specified and out parameters).
"""
self._calcargs(cursor)
if self.owner is None:
name = self.name
else:
name = f"{self.owner}.{self.name}"
queryargs = self._getargs(cursor, *args, **kwargs)
returnvalue = "r"
while returnvalue in queryargs:
returnvalue += "_"
queryargs[returnvalue] = self._wraparg(cursor, self._returnvalue, None)
queryargstring = ", ".join(f"{name}=>:{name}" for name in queryargs if name != returnvalue)
query = f"begin :{returnvalue} := {name}({queryargstring}); end;"
cursor.execute(query, queryargs)
returnvalue = self._unwraparg(self._returnvalue, cursor, queryargs.pop(returnvalue).getvalue(0))
return (returnvalue, self._makerecord(cursor, queryargs))
[docs]
class Package(MixinNormalDates, MixinCodeSQL, OwnedSchemaObject):
"""
Models a package in the database.
"""
type = "package"
[docs]
class PackageBody(MixinNormalDates, MixinCodeSQL, OwnedSchemaObject):
"""
Models a package body in the database.
"""
type = "package body"
def synonyms(self, connection=None):
yield from ()
def privileges(self, connection=None):
yield from ()
[docs]
class Type(MixinNormalDates, MixinCodeSQL, OwnedSchemaObject):
"""
Models a type definition in the database.
"""
type = "type"
[docs]
class TypeBody(MixinNormalDates, MixinCodeSQL, OwnedSchemaObject):
"""
Models a type body in the database.
"""
type = "type body"
def synonyms(self, connection=None):
yield from ()
def privileges(self, connection=None):
yield from ()
[docs]
class Trigger(MixinNormalDates, MixinCodeSQL, OwnedSchemaObject):
"""
Models a trigger in the database.
"""
type = "trigger"
[docs]
class JavaSource(MixinNormalDates, OwnedSchemaObject):
"""
Models Java source code in the database.
"""
type = "java source"
def exists(self, connection=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
1
from
{ddprefix}_source
where
type = 'JAVA SOURCE' and
owner = nvl(:owner, user) and
name = :name
"""
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
return rec is not None
def createsql(self, connection=None, term=True):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"""
select
text
from
{ddprefix}_source
where
type = 'JAVA SOURCE' and
owner = nvl(:owner, user) and
name = :name
order by
line
"""
cursor.execute(query, owner=self.owner, name=self.name)
code = "\n".join((rec.text or "").rstrip() for rec in cursor)
code = code.strip()
code = f"create or replace and compile java source named {self.getfullname()} as\n{code}\n"
if term:
code += "/\n"
return code
def dropsql(self, connection=None, term=True):
code = f"drop java source {self.getfullname()}"
if term:
code += ";\n"
else:
code += "\n"
return code
def fixname(self, code):
code = code.split(None, 9)
code = f"create or replace and compile java source named {self.getfullname()} {code[9]}"
return code
[docs]
class Privilege:
"""
Models a database object privilege (i.e. a grant).
A :class:`Privilege` object has the following attributes:
``privilege`` : string
The type of the privilege (``EXECUTE`` etc.)
``name`` : string
The name of the object for which this privilege grants access
``owner`` : string or :const:`None`
the owner of the object
``grantor`` : string or :const:`None`
Who granted this privilege?
``grantee`` : string or :const:`None`
To whom has this privilege been granted?
``connection`` : :class:`Connection` or :const:`None`
The database connection
"""
type = "privilege"
def __init__(self, privilege, name, grantor, grantee, owner=None, connection=None):
self.privilege = privilege
self.name = name
self.grantor = grantor
self.grantee = grantee
self.owner = owner
self.connection = connection
def __repr__(self):
if self.owner is not None:
return f"{self.__class__.__module__}.{self.__class__.__qualname__}({self.privilege!r}, {self.name!r}, {self.grantee!r}, {self.owner!r})"
else:
return f"{self.__class__.__module__}.{self.__class__.__qualname__}({self.privilege!r}, {self.name!r}, {self.grantee!r})"
def __str__(self):
if self.owner is not None:
return f"{self.privilege} privilege on {self.name} @ {self.owner} by {self.grantor} to {self.grantee}"
else:
return f"{self.privilege} privilege on {self.name} by {self.grantor} to {self.grantee}"
def getconnection(self, connection):
if connection is None:
connection = self.connection
if connection is None:
raise TypeError("no connection available")
return connection
def getcursor(self, connection):
connection = self.getconnection(connection)
return (connection, connection.cursor())
def getconnectstring(self):
if self.connection:
return self.connection.connectstring()
return None
connectstring = property(getconnectstring)
[docs]
def object(self, connection=None):
"""
Return the object on which ``self`` grants a privilege.
"""
(connection, cursor) = self.getcursor(connection)
return connection._getobject(self.name, self.grantor)
[docs]
@classmethod
def objects(cls, connection, owner=None):
"""
Generator that yields object privileges. For the meaning of ``owner``
see :func:`owned`.
"""
cursor = connection.cursor() # can't use :meth:`getcursor` as we're in a classmethod
if owner is None:
query = """
select
null as owner,
privilege,
table_name as object,
decode(grantor, user, null, grantor) as grantor,
grantee
from
user_tab_privs
where
owner = user
order by
grantor,
table_name,
privilege,
grantee
"""
cursor.execute(query)
elif owner is ALL:
ddprefix = cursor.ddprefix()
# The column names in ``ALL_TAB_PRIVS`` and ``DBA_TAB_PRIVS`` are different, so we have to use two different queries
if ddprefix == "all":
query = """
select
decode(table_schema, user, null, table_schema) as owner,
privilege,
table_name as object,
decode(grantor, user, null, grantor) as grantor,
grantee
from
all_tab_privs
order by
grantor,
table_name,
privilege,
grantee
"""
else:
query = """
select
decode(owner, user, null, owner) as owner,
privilege,
table_name as object,
decode(grantor, user, null, grantor) as grantor,
grantee
from
dba_tab_privs
order by
grantor,
table_name,
privilege,
grantee
"""
cursor.execute(query)
elif isinstance(owner, str):
ddprefix = cursor.ddprefix()
query = """
select
decode(table_schema, user, null, table_schema) as owner,
privilege,
table_name as object,
decode(grantor, user, null, grantor) as grantor,
grantee
from
{ddprefix}_tab_privs
where
table_schema = :owner
order by
table_schema,
table_name,
privilege
"""
cursor.execute(query, owner=owner)
else:
ddprefix = cursor.ddprefix()
# The column names in ``ALL_TAB_PRIVS`` and ``DBA_TAB_PRIVS`` are different, so we have to use two different queries
if ddprefix == "all":
query = f"""
select
decode(table_schema, user, null, table_schema) as owner,
privilege,
table_name as object,
decode(grantor, user, null, grantor) as grantor,
grantee
from
all_tab_privs
where
table_schema in ({', '.join(sqlstr(o) for o in owner)})
order by
table_name,
privilege
"""
else:
query = f"""
select
decode(owner, user, null, owner) as owner,
privilege,
table_name as object,
decode(grantor, user, null, grantor) as grantor,
grantee
from
dba_tab_privs
where
owner in ({', '.join(sqlstr(o) for o in owner)})
order by
table_name,
privilege
"""
cursor.execute(query)
for rec in cursor:
yield Privilege(rec.privilege, rec.object, rec.grantor, rec.grantee, rec.owner, connection)
[docs]
def grantsql(self, connection=None, term=True, mapgrantee=True):
"""
Return SQL code to grant this privilege. If ``mapgrantee`` is a list
or a dictionary and ``self.grantee`` is not in this list (or dictionary)
no command will be returned. If it's a dictionary and ``self.grantee`` is
in it, the privilege will be granted to the user specified as the value
instead of the original one. If ``mapgrantee`` is true (the default)
the privilege will be granted to the original grantee.
"""
(connection, cursor) = self.getcursor(connection)
if mapgrantee is True:
grantee = self.grantee
elif isinstance(mapgrantee, (set, list, tuple)):
if self.grantee.lower() in (g.lower() for g in mapgrantee):
grantee = self.grantee
else:
grantee = None
else:
mapgrantee = {key.lower(): value for (key, value) in mapgrantee.items()}
grantee = mapgrantee.get(self.grantee.lower(), None)
if grantee is None:
return ""
code = f"grant {self.privilege} on {self.name} to {grantee}"
if term:
code += ";\n"
return code
[docs]
class Column(OwnedSchemaObject):
"""
Models a single column of a table in the database. This is used to output
``ALTER TABLE`` statements for adding, dropping and modifying columns.
"""
type = "column"
@property
def tablename(self):
return self.name.partition(".")[0]
@property
def columnname(self):
return self.name.partition(".")[-1]
def exists(self, connection=None):
(connection, cursor) = self.getcursor(connection)
name = self.name.split(".")
ddprefix = cursor.ddprefix()
query = f"""
select
1
from
{ddprefix}_tab_columns
where
owner = nvl(:owner, user) and
table_name = :table_name and
column_name = :column_name
"""
cursor.execute(query, owner=self.owner, table_name=name[0], column_name=name[1])
rec = cursor.fetchone()
return rec is not None
def _getcolumnrecord(self, cursor):
name = self.name.split(".")
ddprefix = cursor.ddprefix()
query = f"""
select
*
from
{ddprefix}_tab_columns c,
{ddprefix}_lobs l
where
c.owner = nvl(:owner, user) and
c.table_name = :table_name and
c.column_name = :column_name and
c.owner = l.owner(+) and
c.table_name = l.table_name(+) and
c.column_name = l.column_name(+)
"""
cursor.execute(query, owner=self.owner, table_name=name[0], column_name=name[1])
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
return rec
def addsql(self, connection=None, term=True):
(connection, cursor) = self.getcursor(connection)
rec = self._getcolumnrecord(cursor)
name = self.name.split(".")
code = [f"alter table {getfullname(name[0], self.owner)} add {getfullname(name[1], None)}"]
code.append(f" {_columntype(rec)}")
default = _columndefault(rec)
if default != "null":
code.append(f" default {default}")
if rec.nullable == "N":
code.append(" not null")
if term:
code.append(";\n")
else:
code.append("\n")
return "".join(code)
def modifysql(self, connection, cursorold, cursornew, term=True):
(connection, cursor) = self.getcursor(connection)
rec = self._getcolumnrecord(cursor)
recold = self._getcolumnrecord(cursorold)
recnew = self._getcolumnrecord(cursornew)
name = self.name.split(".")
code = [f"alter table {getfullname(name[0], self.owner)} modify {getfullname(name[1], None)}"]
# Has the type changed?
if recold.data_precision != recnew.data_precision or recold.data_length != recnew.data_length or recold.data_scale != recnew.data_scale or recold.char_length != recnew.char_length or recold.data_type != recnew.data_type or recold.data_type_owner != recnew.data_type_owner:
# Has only the size changed?
if rec.data_type == recold.data_type == recnew.data_type and rec.data_type_owner == recold.data_type_owner == recnew.data_type_owner:
try:
data_precision = max(r.data_precision for r in (rec, recold, recnew) if r.data_precision is not None)
except ValueError:
data_precision = None
try:
data_scale = max(r.data_scale for r in (rec, recold, recnew) if r.data_scale is not None)
except ValueError:
data_scale = None
try:
char_length = max(r.char_length for r in (rec, recold, recnew) if r.char_length is not None)
except ValueError:
char_length = None
columntype = _columntype(rec, data_precision=data_precision, data_scale=data_scale, char_length=char_length)
code.append(f" {columntype}")
else: # The type has changed too
if recnew.data_type != rec.data_type or recnew.data_type_owner != rec.data_type_owner:
raise ConflictError(self, "data_type unmergeable")
elif recnew.data_precision != rec.data_precision:
raise ConflictError(self, "data_precision unmergeable")
elif recnew.data_scale != rec.data_scale:
raise ConflictError(self, "data_scale unmergeable")
elif recnew.char_length != rec.char_length:
raise ConflictError(self, "char_length unmergeable")
code.append(f" {_columntype(recnew)}")
# Has the default changed?
default = _columndefault(rec)
olddefault = _columndefault(recold)
newdefault = _columndefault(recnew)
if olddefault != newdefault:
if newdefault != default:
raise ConflictError(self, "default value unmergable")
code.append(f" default {newdefault}")
# Check nullability
if recold.nullable != recnew.nullable:
if recnew.nullable == "N":
code.append(" not null")
else:
code.append(" null")
if term:
code.append(";\n")
else:
code.append("\n")
return "".join(code)
def dropsql(self, connection=None, term=True):
(connection, cursor) = self.getcursor(connection)
name = self.name.split(".")
code = f"alter table {getfullname(name[0], self.owner)} drop column {getfullname(name[1], None)}"
if term:
code += ";\n"
else:
code += "\n"
return code
def table(self):
name = self.name.split(".")
return Table(name[0], self.owner, self.connection)
def cdate(self, connection=None):
# The column creation date is the table creation date
return self.table().cdate(connection)
def udate(self, connection=None):
# The column modification date is the table modification date
return self.table().udate(connection)
def references(self, connection=None, dependencies=None):
connection = self.getconnection(connection)
name = self.name.split(".")
yield Table(name[0], self.owner, connection)
[docs]
def datatype(self, connection=None):
"""
The SQL type of this column.
"""
(connection, cursor) = self.getcursor(connection)
rec = self._getcolumnrecord(cursor)
return _columntype(rec)
[docs]
def default(self, connection=None):
"""
The SQL default value for this column.
"""
(connection, cursor) = self.getcursor(connection)
rec = self._getcolumnrecord(cursor)
return _columndefault(rec)
[docs]
def nullable(self, connection=None):
"""
Is this column nullable?
"""
(connection, cursor) = self.getcursor(connection)
rec = self._getcolumnrecord(cursor)
return rec.nullable == "Y"
[docs]
def compression(self, connection=None):
"""
The compression mode for this LOB column.
Return ``None`` if this is not a LOB column, or it isn't compressed.
"""
(connection, cursor) = self.getcursor(connection)
rec = self._getcolumnrecord(cursor)
compression = rec.compression
if compression == "NO":
compression = None
return compression
def synonyms(self, connection=None):
yield from ()
def privileges(self, connection=None):
yield from ()
[docs]
class User(SchemaObject):
"""
Models a user in the database.
"""
def exists(self, connection=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"select 1 from {ddprefix}_users where username = :username"
cursor.execute(query, username=self.name)
rec = cursor.fetchone()
return rec is not None
[docs]
@classmethod
def names(cls, connection):
"""
Generator that yields the names of all users in ascending order
"""
cursor = connection.cursor()
ddprefix = cursor.ddprefix()
query = f"select username from {ddprefix}_users order by username"
cursor.execute(query)
for row in cursor:
yield row.username
[docs]
@classmethod
def objects(cls, connection):
"""
Generator that yields all user objects.
"""
for name in cls.names(connection):
yield cls(name, connection)
[docs]
class Preference(OwnedSchemaObject):
"""
Models a preference in the database.
"""
type = "preference"
def exists(self, connection=None):
(connection, cursor) = self.getcursor(connection)
query = "select 1 from ctx_preferences where pre_owner = nvl(:owner, user) and pre_name = :name"
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
return rec is not None
def createsql(self, connection=None, term=True):
(connection, cursor) = self.getcursor(connection)
query = "select pre_object from ctx_preferences where pre_owner = nvl(:owner, user) and pre_name = :name"
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
name = self.getfullname()
code = ["begin\n"]
code.append(f"\tctx_ddl.create_preference({sqlliteral(name)}, {sqlliteral(rec.pre_object)});\n")
cursor.execute("select prv_attribute, prv_value from ctx_preference_values where prv_owner=nvl(:owner, user) and prv_preference=:name", owner=self.owner, name=self.name)
for rec in cursor:
code.append(f"\tctx_ddl.set_attribute({sqlliteral(name)}, {sqlliteral(rec.prv_attribute)}, {sqlliteral(rec.prv_value)});\n")
code.append("end;\n")
code = "".join(code)
if term:
code += "/\n"
return code
def dropsql(self, connection=None, term=True):
name = self.getfullname()
code = f"begin\n\tctx_ddl.drop_preference({sqlliteral(name)});\nend;\n"
if term:
code += "/\n"
return code
# FIXME: Implement :meth:`referencedby` by parsing the parameters of all domain indexes
# and output those indexes here that reference us in any of their parameters
[docs]
@classmethod
def names(cls, connection, owner=None):
"""
Generator that yields the names of all preferences.
"""
cursor = connection.cursor()
try:
if owner is None:
query = "select null as owner, pre_name from ctx_preferences where pre_owner=user order by pre_name"
cursor.execute(query)
elif owner is ALL:
query = "select pre_owner as owner, pre_name from ctx_preferences order by pre_owner, pre_name"
cursor.execute(query)
elif isinstance(owner, str):
query = "select decode(pre_owner, user, null, pre_owner) as owner, pre_name from ctx_preferences where pre_owner = :owner order by pre_name"
cursor.execute(query, owner=owner)
else:
query = f"select pre_owner as owner, pre_name from ctx_preferences where pre_owner in ({', '.join(sqlstr(o) for o in owner)}) order by pre_owner, pre_name"
cursor.execute(query)
except DatabaseError as exc:
if exc.args[0].code == 942: # ORA-00942: table or view does not exist
return iter(())
else:
raise
else:
for row in cursor:
yield (row.pre_name, row.owner)
[docs]
@classmethod
def objects(cls, connection, owner=None):
"""
Generator that yields all preferences.
"""
for name in cls.names(connection, owner=owner):
yield cls(name[0], name[1], connection)
[docs]
class JobClass(SchemaObject):
"""
Models a job class (from the ``dbms_scheduler`` package) in the database.
"""
type = "jobclass"
def exists(self, connection=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"select 1 from {ddprefix}_scheduler_job_classes where job_class_name = :name"
cursor.execute(query, name=self.name)
rec = cursor.fetchone()
return rec is not None
def createsql(self, connection=None, term=True):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"select * from {ddprefix}_scheduler_job_classes where job_class_name = :name"
cursor.execute(query, name=self.name)
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
if rec.logging_level is None:
logging_level = "null"
else:
logging_level = rec.logging_level.replace(" ", "_").lower()
logging_level = f"dbms_scheduler.logging_{logging_level}"
code = []
code.append(f"declare\n")
code.append(f"\tv_jobname varchar2(128) := {sqlliteral(rec.job_class_name)};\n")
code.append(f"begin\n")
code.append(f"\tsys.dbms_scheduler.create_job_class(\n")
code.append(f"\t\tjob_class_name=>v_jobname,\n")
code.append(f"\t\tresource_consumer_group=>{sqlliteral(rec.resource_consumer_group)},\n")
code.append(f"\t\tservice=>{sqlliteral(rec.service)},\n")
code.append(f"\t\tlogging_level=>{logging_level},\n")
code.append(f"\t\tlog_history=>{sqlliteral(rec.log_history)},\n")
code.append(f"\t\tcomments=>''\n")
code.append(f"\t);\n")
code.append(f"end;\n")
if term:
code.append("/\n")
code = "".join(code)
return code
def dropsql(self, connection=None, term=True):
code = f"dbms_scheduler.drop_job_class(job_class_name=>{sqlliteral(self.name)})"
if term:
code += ";\n"
else:
code += "\n"
return code
def fixname(self, code):
if code:
code = code.splitlines(True)
name = self.getfullname()
code[1] = f"\tv_jobname varchar2(128) := {sqlliteral(name)};\n"
code = "".join(code)
return code
[docs]
@classmethod
def names(cls, connection):
"""
Generator that yields the names of all job classes.
"""
cursor = connection.cursor()
ddprefix = cursor.ddprefix()
query = f"select job_class_name from {ddprefix}_scheduler_job_classes order by job_class_name"
cursor.execute(query)
for row in cursor:
yield row.job_class_name
[docs]
@classmethod
def objects(cls, connection):
"""
Generator that yields all job classes.
"""
for name in cls.names(connection):
yield cls(name, connection)
def references(self, connection=None, dependencies=None):
yield from ()
def referencedby(self, connection=None, dependencies=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"select decode(owner, user, null, owner) as owner, job_name from {ddprefix}_scheduler_jobs where job_class = :name"
cursor.execute(query, name=self.name)
for rec in cursor:
yield Job(rec.job_name, rec.owner, connection)
[docs]
class Job(OwnedSchemaObject):
"""
Models a job (from the ``dbms_scheduler`` package) in the database.
"""
type = "job"
def exists(self, connection=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"select 1 from {ddprefix}_scheduler_jobs where owner = nvl(:owner, user) and job_name = :name"
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
return rec is not None
def createsql(self, connection=None, term=True):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"select * from {ddprefix}_scheduler_jobs where owner = nvl(:owner, user) and job_name = :name"
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
enabled = "true" if rec.enabled == "TRUE" and rec.number_of_arguments == 0 else "false"
auto_drop = "true" if rec.auto_drop == "TRUE" else "false"
code = []
code.append(f"declare\n")
code.append(f"\tv_jobname varchar2(128) := {sqlliteral(rec.job_name)};\n")
code.append(f"begin\n")
code.append(f"\tsys.dbms_scheduler.create_job(\n")
code.append(f"\t\tjob_name=>v_jobname,\n")
code.append(f"\t\tjob_type=>{sqlliteral(rec.job_type)},\n")
code.append(f"\t\tjob_action=>{sqlliteral(rec.job_action)},\n")
code.append(f"\t\tnumber_of_arguments=>{sqlliteral(rec.number_of_arguments)},\n")
code.append(f"\t\tstart_date=>{sqlliteral(rec.start_date)},\n")
code.append(f"\t\trepeat_interval=>{sqlliteral(rec.repeat_interval)},\n")
code.append(f"\t\tend_date=>{sqlliteral(rec.end_date)},\n")
code.append(f"\t\tjob_class=>{sqlliteral(rec.job_class)},\n")
code.append(f"\t\tenabled=>{enabled},\n")
code.append(f"\t\tauto_drop=>{auto_drop},\n")
code.append(f"\t\tcomments=>{sqlliteral(rec.comments)}\n")
code.append(f"\t);\n")
query = f"select argument_position, argument_type, value from {ddprefix}_scheduler_job_args where owner = nvl(:owner, user) and job_name = :job_name order by argument_position"
cursor.execute(query, owner=self.owner, job_name=rec.job_name)
for rec2 in cursor:
code.append(f"\n")
code.append(f"\tsys.dbms_scheduler.set_job_argument_value(\n")
code.append(f"\t\tjob_name=>v_jobname,\n")
code.append(f"\t\targument_position=>{sqlliteral(rec2.argument_position)},\n")
code.append(f"\t\targument_value=>{sqlliteral(rec2.value)}\n")
code.append(f"\t);\n")
if rec.enabled == "TRUE" and rec.number_of_arguments and rec.number_of_arguments > 0:
code.append(f"\n")
code.append(f"\tsys.dbms_scheduler.enable(v_jobname);\n")
code.append("end;\n")
if term:
code.append("/\n")
code = "".join(code)
return code
def dropsql(self, connection=None, term=True):
name = self.getfullname()
code = f"begin\n\tdbms_scheduler.drop_job(job_name=>{sqlliteral(name)}, force=>true);\nend;\n"
if term:
code += "/\n"
return code
def fixname(self, code):
if code:
code = code.splitlines(True)
name = self.getfullname()
code[1] = f"\tv_jobname varchar2(128) := {sqlliteral(name)};\n"
code = "".join(code)
return code
[docs]
@classmethod
def names(cls, connection, owner=None):
"""
Generator that yields the names of all jobs.
"""
cursor = connection.cursor()
if owner is None:
query = "select null as owner, job_name from user_scheduler_jobs order by job_name"
cursor.execute(query)
elif owner is ALL:
ddprefix = cursor.ddprefix()
query = f"select owner, job_name from {ddprefix}_scheduler_jobs order by owner, job_name"
cursor.execute(query)
elif isinstance(owner, str):
ddprefix = cursor.ddprefix()
query = f"select owner, job_name from {ddprefix}_scheduler_jobs where owner = :owner order by job_name"
cursor.execute(query, owner=owner)
else:
ddprefix = cursor.ddprefix()
query = f"select owner, job_name from {ddprefix}_scheduler_jobs where owner in ({', '.join(sqlstr(o) for o in owner)}) order by owner, job_name"
cursor.execute(query)
for row in cursor:
yield (row.job_name, row.owner)
[docs]
@classmethod
def objects(cls, connection, owner=None):
"""
Generator that yields all jobs.
"""
for name in cls.names(connection, owner=owner):
yield cls(name[0], name[1], connection)
def references(self, connection=None, dependencies=None):
(connection, cursor) = self.getcursor(connection)
ddprefix = cursor.ddprefix()
query = f"select job_class from {ddprefix}_scheduler_jobs where owner = nvl(:owner, user) and job_name = :name"
cursor.execute(query, owner=self.owner, name=self.name)
rec = cursor.fetchone()
if rec is None:
raise SQLObjectNotFoundError(self)
# We don't want to reference any system job classes
system_classes = {"DEFAULT_JOB_CLASS", "DEFAULT_IN_MEMORY_JOB_CLASS", "XMLDB_NFS_JOBCLASS"}
if "$" not in rec.job_class and rec.job_class not in system_classes:
yield JobClass(rec.job_class, connection)
def referencedby(self, connection=None, dependencies=None):
# Oracle does not provide any information about which procedures etc.
# the job calls, so skip the check
# (To still be able to get objects in a usable order
# :meth:`Connect.objects` outputs jobs last.)
yield from ()
###
### Classes that add an ``oracle`` scheme to the urls supported by :mod:`ll.url`.
###
class OracleURLConnection(url_.Connection):
def __init__(self, context, connection, mode):
self.dbconnection = connect(connection, mode=mode) if mode is not None else connect(connection)
def open(self, url, mode="rb", encoding="utf-8", errors="strict"):
return OracleFileResource(self, url, mode, encoding, errors)
def close(self):
self.dbconnection.close()
def _type(self, url):
path = url.path
if path and not path[-1]:
path = path[:-1]
lp = len(path)
if lp == 0:
return "root"
elif lp == 1:
if path[0] == "user":
return "allusers"
else:
return "type"
elif lp == 2:
if path[0] == "user":
return "user"
else:
return "object"
elif lp == 3:
if path[0] == "user":
return "usertype"
elif lp == 4:
if path[0] == "user":
return "userobject"
raise FileNotFoundError(errno.ENOENT, f"no such file or directory: {url!r}") from None
def _infofromurl(self, url):
type = self._type(url)
if type == "root":
owner = None
objectype = None
name = None
elif type == "allusers":
owner = None
objectype = None
name = None
elif type == "type":
owner = None
objectype = None
name = None
elif type == "user":
owner = url.path[1]
objectype = None
name = None
elif type == "object":
owner = None
objectype = url.path[0]
name = url.path[1]
elif type == "usertype":
owner = url.path[1]
objectype = url.path[2]
name = None
else:
owner = url.path[1]
objectype = url.path[2]
name = url.path[3]
if name is not None:
if name.lower().endswith(".sql"):
name = name[:-4]
name = unicodedata.normalize('NFC', name)
return (type, owner, objectype, name)
def _objectfromurl(self, url):
(type, owner, objecttype, name) = self._infofromurl(url)
if objecttype not in SchemaObject.name2type:
raise ValueError(f"don't know how to handle {url!r}")
return SchemaObject.name2type[objecttype](name, owner)
def isdir(self, url):
return not self._type(url).endswith("object")
def isfile(self, url):
return self._type(url).endswith("object")
def mimetype(self, url):
if self.isdir(url):
return "application/octet-stream"
type = url.path[0 if url.path[0] != "user" else 2]
return f"text/x-oracle-{type}"
def owner(self, url):
if len(url.path) >= 2 and url.path[0] == "user" and url.path[1]:
return url.path[1]
else:
c = self.dbconnection.cursor()
c.execute("select user from dual")
return c.fetchone()[0]
def exists(self, url):
try:
type = self._type(url)
except FileNotFoundError:
return False
if type.endswith("object"):
return self._objectfromurl(url).exists(self.dbconnection)
else:
return True
def cdate(self, url):
if self.isdir(url):
return bigbang
try:
obj = self._objectfromurl(url)
except SQLNoSuchObjectError:
raise FileNotFoundError(errno.ENOENT, f"no such file: {url!r}")
return obj.cdate(self.dbconnection)
def mdate(self, url):
if self.isdir(url):
return bigbang
try:
obj = self._objectfromurl(url)
except SQLNoSuchObjectError:
raise FileNotFoundError(errno.ENOENT, f"no such file: {url!r}")
return obj.udate(self.dbconnection)
def _walk(self, cursor, url):
def _event(url, event):
cursor.url = url
cursor.event = event
cursor.isdir = event != "file"
cursor.isfile = not cursor.isdir
return cursor
def _dir(childname):
emitbeforedir = cursor.beforedir
emitafterdir = cursor.afterdir
enterdir = cursor.enterdir
if emitbeforedir or enterdir or emitafterdir:
childurl = url / childname
if emitbeforedir:
yield _event(childurl, "beforedir")
emitbeforedir = cursor.beforedir
emitafterdir = cursor.afterdir
enterdir = cursor.enterdir
cursor.restore()
if enterdir:
yield from self._walk(cursor, childurl)
if emitafterdir:
yield _event(childurl, "afterdir")
cursor.restore()
absurl = cursor.rooturl / url
type = self._type(absurl)
if type == "root": # directory of types for the current user
for childname in sorted(SchemaObject.name2type):
if childname not in ("tablecomment", "columncomment", "column"):
yield from _dir(f"{childname}/")
elif type == "type": # directory of objects of the specified type for current user
path = absurl.path
type = path[0]
try:
class_ = SchemaObject.name2type[type]
except KeyError:
raise FileNotFoundError(errno.ENOENT, f"no such file or directory: {url!r}") from None
if issubclass(class_, OwnedSchemaObject):
names = (name for (name, owner) in class_.names(self.dbconnection, None))
else:
names = class_.names(self.dbconnection)
for name in names:
if cursor.file:
yield _event(url / f"{makeurl(name)}.sql", "file")
cursor.restore()
elif type == "allusers": # directory of all users
path = url.path
for name in User.names(self.dbconnection):
yield from _dir(f"{makeurl(name)}/")
elif type == "user": # directory of types for a specific user
path = absurl.path
for childname in sorted(SchemaObject.name2type):
if childname not in ("tablecomment", "columncomment", "column"):
yield from _dir(f"{childname}/")
elif type == "usertype": # directory of objects of the specified type for a specific user
path = absurl.path
type = path[2]
try:
class_ = SchemaObject.name2type[type]
except KeyError:
raise FileNotFoundError(errno.ENOENT, f"no such file or directory: {url!r}") from None
if issubclass(class_, OwnedSchemaObject):
names = (name for (name, owner) in class_.names(self.dbconnection, path[1]))
else:
names = class_.names(self.dbconnection)
for name in names:
if cursor.file:
yield _event(url / f"{makeurl(name)}.sql", "file")
cursor.restore()
else:
raise NotADirectoryError(errno.ENOTDIR, f"Not a directory: {url}")
def walk(self, url, beforedir=True, afterdir=False, file=True, enterdir=True):
cursor = url_.Cursor(url, beforedir=beforedir, afterdir=afterdir, file=file, enterdir=enterdir)
return self._walk(cursor, url_.URL())
def __repr__(self):
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} to {self.connection.connectstring()!r} at {id(self):#x}>"
[docs]
class OracleFileResource(url_.Resource):
"""
An :class:`OracleFileResource` wraps an Oracle database object (like a
table, view, function, procedure etc.) in a file-like API for use with
:mod:`ll.url`.
"""
def __init__(self, connection, url, mode="r", encoding="utf-8", errors="strict"):
self.connection = connection
self.url = url
self.mode = mode
self.encoding = encoding
self.errors = errors
self.closed = False
self.name = str(self.url)
if "w" in self.mode:
if "b" in self.mode:
self.stream = io.BytesIO()
else:
self.stream = io.StringIO()
else:
code = self.connection._objectfromurl(url).createsql(self.connection.dbconnection, term=False)
if "b" in self.mode:
code = code.encode(self.encoding, self.errors)
self.stream = io.BytesIO(code)
else:
self.stream = io.StringIO(code)
def read(self, size=-1):
if self.closed:
raise ValueError("I/O operation on closed file")
return self.stream.read(size)
def write(self, data):
if self.closed:
raise ValueError("I/O operation on closed file")
return self.stream.write(data)
def mimetype(self):
return self.connection.mimetype(self.url)
def cdate(self):
return self.connection.cdate(self.url)
def mdate(self):
return self.connection.mdate(self.url)
def __iter__(self):
data = self.read()
return iter(data.splitlines(True))
def close(self):
if not self.closed:
if "w" in self.mode:
obj = self.connection._objectfromurl(self.url)
code = self.stream.getvalue()
if isinstance(code, bytes):
code = code.decode(self.encoding, self.errors)
code = obj.fixname(code)
cursor = self.connection.dbconnection.cursor()
cursor.execute(code)
self.stream = None
self.closed = True
class OracleSchemeDefinition(url_.SchemeDefinition):
def _connect(self, url, context=None, **kwargs):
context = url_.getcontext(context)
# Use one :class:`OracleURLConnection` for each ``user@host`` combination
server = url.server
try:
connections = context.schemes["oracle"]
except KeyError:
connections = context.schemes["oracle"] = {}
try:
connection = connections[server]
except KeyError:
userinfo = url.userinfo.split(":")
lui = len(userinfo)
if lui == 2:
mode = None
elif lui == 3:
try:
mode = dict(sysoper=SYSOPER, sysdba=SYSDBA, normal=None)[userinfo[2]]
except KeyError:
raise ValueError(f"unknown connect mode {userinfo[2]!r}")
else:
raise ValueError(f"illegal userinfo {url.userinfo!r}")
connection = connections[server] = OracleURLConnection(context, f"{userinfo[0]}/{userinfo[1]}@{url.host}", mode)
return (connection, kwargs)
def open(self, url, mode="rb", context=None):
(connection, kwargs) = self._connect(url, context)
return OracleFileResource(connection, url, mode, **kwargs)
def closeall(self, context):
for connection in context.schemes["oracle"].values():
connection.close()
url_.schemereg["oracle"] = OracleSchemeDefinition("oracle", usehierarchy=True, useserver=True, usefrag=False, islocal=False, isremote=True)