from __future__ import annotations
import json
import re
import warnings
from copy import deepcopy
from dataclasses import dataclass
from pathlib import Path
from typing import Any, cast
import ldap
from ldap_filter import Filter # type: ignore[reportUnknownVariableType]
from ldap_filter.parser import ParseError
from .hooks import hooks
from .types import (
AddModList,
Attrlist,
CILDAPData,
LDAPData,
LDAPObjectStore,
LDAPOptionStore,
LDAPOptionValue,
LDAPRecord,
LDAPSearchResult,
ModList,
RawLDAPObjectStore,
)
[docs]@dataclass
class LDAPCallRecord:
"""
A single LDAP call record, used by :py:class:`CallHistory` to store
information about calls to LDAP api functions.
:py:attr:`api_name` is the name of the LDAP api call made
(e.g. ``simple_bind_s``, ``search_s``).
:py:attr:`args` is the argument list of the call, including defaults for
keyword arguments not passed. This is a dict where the key is the name of
the positional or keyword argument, and the value is the passed in (or
default) value for that argument.
Example:
If we make this call to a patched :py:class:`FakeLDAPObject`::
ldap_obj.search_s('ou=bar,o=baz,c=country', ldap.SCOPE_SUBTREE, '(uid=foo)')
This will be recorded as::
LDAPCallRecord(
api_name='search_s',
args={
'base': 'ou=bar,o=baz,c=country',
'scope': 2,
'filterstr': '(uid=foo)',
'attrlist': None,
'attrsonly': 0
}
)
"""
api_name: str #: the name LDAP api call
args: dict[str, Any] #: the args and kwargs dict
[docs]class LDAPServerFactory:
"""
Registers :py:class:`ObjectStore` objects to be used by
``FakeLDAP.initialize()`` in constructing :py:class:`FakeLDAPObject` objects.
:py:class:`ObjectStore` objects are named registered here by LDAP uri (in reality,
any string).
You may do one of two things, but not both:
* Configure a default :py:class:`ObjectStore` that will be used for all
:py:func:`ldap.initialize` calls regardless of ``uri``
* Assign a specific :py:class:`ObjectStore` for each ``uri`` you will be using
in your code.
Example:
To register a default :py:class:`ObjectStore` that will be used for every
``uri`` passed to :py:meth:`FakeLDAP.initialize`:
>>> from ldap_faker import ObjectStore, LDAPServerFactory, FakeLDAP
>>> data = [ ... ] # some LDAP records
>>> factory = LDAPServerFactory()
>>> store = ObjectStore(objects=data)
>>> factory.register(store)
>>> fake_ldap = FakeLDAP(factory)
Now any time your code does an ``ldap.initialize()`` to our patched
version of that function, it will get a a :py:class:`FakeLDAPObject`
configured with a :py:func:`copy.deepcopy` of the
:py:class:`ObjectStore` ``store``, no matter what ``uri`` it passes to
``ldap.initialize()``.
To register a different ``ObjectStores`` that will be used for specific
``uris``:
>>> from ldap_faker import ObjectStore, Servers
>>> data1 = [ ... ] # some LDAP records
>>> factory = LDAPServerFactory()
>>> store1 = ObjectStore(objects=data1)
>>> factory.register(store1, uri='ldap://server1')
>>> data2 = [ ... ] # some different LDAP records
>>> store2 = ObjectStore(objects=data2)
>>> factory.register(store2, uri='ldap://server2')
>>> fake_ldap = FakeLDAP(factory)
Now if your code does ``ldap.initialize('ldap://server1')``, it will get
a :py:class:`FakeLDAPObject` configured with a :py:func:`copy.deepcopy`
of the :py:class:`ObjectStore` object ``store1``, while if it does
``ldap.initialize('ldap://server2' )``, it will get a
:py:class:`FakeLDAPObject` configured with a :py:func:`copy.deepcopy` of
the :py:class:`ObjectStore` object ``store2``.
"""
def __init__(self) -> None:
self.servers: dict[str, ObjectStore] = {}
self.default: ObjectStore | None = None
[docs] def load_from_file(
self, filename: str, uri: str | None = None, tags: list[str] | None = None
) -> None:
"""
Given a file path to a JSON file with the objects for an
:py:class:`ObjectStore`, create a new :py:class:`ObjectStore`, load it
with that JSON File and register it with uri of ``uri``.
Args:
filename: the full path to our JSON file
Keyword Args:
uri: the uri to assign to the :py:class:`ObjectStore` we create
tags: the list of tags to apply to the the :py:class:`ObjectStore`
Raises:
ValueError: raised if a default is already configured while trying to
register the :py:class:`ObjectStore` with a specific ``uri``
RuntimeWarning: raised if we try to overwrite an already registered object
store with our new one
"""
if not tags:
tags = []
store = ObjectStore(tags=tags)
store.load_objects(filename)
self.register(store, uri=uri)
[docs] def register(self, store: ObjectStore, uri: str | None = None) -> None:
"""
Register a new :py:class:`ObjectStore` to be used as our fake LDAP
server for when we run our fake ``initialize`` function.
Args:
store: a configured :py:class:`ObjectStore`
Keyword Args:
uri: the LDAP uri to associated with ``directory``
Raises:
ValueError: raised if a default is already configured while trying to
register an :py:class:`ObjectStore` with a specific ``uri``
RuntimeWarning: raised if we try to overwrite an already registered object
store with a new one
"""
if uri and self.default is not None:
msg = (
f'You cannot regster an ObjectStore for uri="{uri}" because '
"a default server has already been set"
)
raise ValueError(msg)
if not uri:
if self.default is not None:
warnings.warn(
"LDAPServerFactory: overriding existing default ObjectStore",
RuntimeWarning,
stacklevel=2,
)
self.default = store
if uri in self.servers:
warnings.warn(
f"LDAPServerFactory: overriding existing ObjectStore for uri={uri}",
RuntimeWarning,
stacklevel=2,
)
self.servers[cast("str", uri)] = store
[docs] def get(self, uri: str) -> ObjectStore:
"""
Return a :py:func:`copy.deepcopy` of the :py:class:`ObjectStore`
identified by ``uri``.
Args:
uri: use this uri to look up which :py:class:`ObjectStore` to use
Raises:
ldap.SERVER_DOWN: no :py:class:`ObjectStore` could be found for ``uri``
Returns:
A :py:func:`copy.deepcopy` of the :py:class:`ObjectStore`
"""
if self.default is not None:
return deepcopy(self.default)
try:
return deepcopy(self.servers[uri])
except KeyError as exc:
raise ldap.SERVER_DOWN({"desc": "Can't contact LDAP Server"}) from exc # type: ignore[attr-defined]
[docs]class CallHistory:
"""
Records the ``python-ldap`` call history for a particular
:py:class:`FakeLDAPObject` as :py:class:`LDAPCallRecord` objects. It works
in conjunction with the ``@record_call`` decorator. An
:py:class:`CallHistory` object will be configured on each
:py:class:`FakeLDAPObject` and on each :py:class:`FakeLDAP` object capture
their call history.
We use this in our tests with appropriate asserts to ensure that our code
called the ``python-ldap`` methods we expected, in the order we expected,
with the arguments we expected.
"""
def __init__(self, calls: list[LDAPCallRecord] | None = None):
self._calls: list[LDAPCallRecord] = []
if calls:
self._calls = calls
def register(self, api_name: str, arguments: dict[str, Any]) -> None:
"""
Register a new call record. This is used by the ``@record_call``
decorator to register :py:class:`FakeLDAPObject` method calls.
Args:
api_name: the name of the function or method called
arguments: a dict where the keys are argument names, and the values
are passed in values for those arguments
:meta private:
"""
self._calls.append(LDAPCallRecord(api_name, arguments))
[docs] def filter_calls(self, api_name: str) -> list[LDAPCallRecord]:
"""
Filter our call history by function name.
Args:
api_name: look through our history for calls to this function
Returns:
A list of (``api_name``, ``arguments``) tuples in the order in which the
calls were made. Arguments is a ``Dict[str, Any]``.
"""
return [call for call in self._calls if call.api_name == api_name]
@property
def calls(self) -> list[LDAPCallRecord]:
"""
Returns the list of all calls made against the parent object.
Example:
To test that your code did a :py:func:`ldap.simple_bind_s` call with
the usernam and password you expected, you could do::
from unittest import TestCase
import ldap
from ldap_faker import LDAPFakerMixin
from my_code import App
class MyTest(LDAPFakerMixin, TestCase):
ldap_modules = ['my_code']
ldap_fixtures = 'myfixture.json'
def test_option_was_set(self):
app = MyApp()
app.do_the_thing()
conn = self.ldap_faker.connections[0]
self.assertEqual(
conn.calls,
[('simple_bind_s', {'who': 'uid=foo,ou=dept,o=org,c=country', 'cred': 'pass'})]
)
Returns:
Returns a list of 2-tuples, one for each method call made since
the last reset. Each tuple contains the name of the API and a dictionary
of arguments. Argument defaults are included.
""" # noqa: E501
return self._calls
@property
def names(self) -> list[str]:
"""
Returns the list names of ``python-ldap`` functions or methods called, in
the order they were called. You can use this to test whether an particulary
Example:
To test that your code did at least one :py:func:`ldap.add_s` call, you
could do::
from unittest import TestCase
import ldap
from ldap_faker import LDAPFakerMixin
from my_code import App
class MyTest(LDAPFakerMixin, TestCase):
ldap_modules = ['my_code']
ldap_fixtures = 'myfixture.json'
def test_option_was_set(self):
app = MyApp()
app.do_the_thing()
conn = self.ldap_faker.connections[0]
self.assertEqual('add_s" in conn.calls.names)
Returns:
A list of method names, in the order they were called.
"""
return [call.api_name for call in self._calls]
[docs]class OptionStore:
"""
We use this to store options set via ``set_option``.
"""
def __init__(self) -> None:
self.options: LDAPOptionStore = {}
[docs] def set(self, option: int, invalue: LDAPOptionValue) -> None:
"""
Set an option.
Args:
option: the code for the option (e.g. :py:data:`ldap.OPT_X_TLS_NEWCTX`)
invalue: the value we want the option to be set to
Raises:
ValueError: ``option`` is not a valid ``python-ldap`` option
"""
if option not in ldap.OPT_NAMES_DICT: # type: ignore[reportUnknownVariableType]
msg = f"unknown option {option}"
raise ValueError(msg)
self.options[option] = invalue
[docs] def get(self, option: int) -> LDAPOptionValue | dict[str, int | str]:
"""
Get the value for a previosly set option that was set via
:py:meth:`OptionStore.set`.
Args:
option: the code for the option (e.g. :py:data:`ldap.OPT_X_TLS_NEWCTX`)
Raises:
ValueError: ``option`` is not a valid ``python-ldap`` option
Returns:
The value for the option, or the default.
"""
if option not in ldap.OPT_NAMES_DICT: # type: ignore[reportUnknownVariableType]
msg = f"unknown option {option}"
raise ValueError(msg)
if option in (ldap.OPT_API_INFO, ldap.OPT_SUCCESS): # type: ignore[attr-defined]
# Even though we declare the output as "int | str", openldap at
# least returns a dict for this
return {
"info_version": 1,
"api_version": 3001,
"vendor_name": "python-ldap-faker",
"vendor_version": "1.0.0",
}
if option == ldap.OPT_PROTOCOL_VERSION: # type: ignore[attr-defined]
return 3
return self.options[option]
[docs]class ObjectStore:
"""
Represents our actual simulated LDAP object store. Copies of this
will be used to configure :py:class:`FakeLDAPObject` objects.
"""
_QUERY_RE: re.Pattern[str] = re.compile(r"\(\w+=.+\)$")
_DEFAULT_SEARCH_RE: re.Pattern[str] = re.compile(
r"^\(objectclass=*\)$", re.IGNORECASE
)
def __init__(self, tags: list[str] | None = None):
# raw_objects preserves the object attribute case as it was given to us
# by register_object, and retains the values as List[bytes]
self.raw_objects: RawLDAPObjectStore = (
RawLDAPObjectStore()
) #: LDAP records as they would have been returned by ``python-ldap```
# objects has the same data as raw_objects, but here we forces the
# attribute names on each object to be case insensitive, and we convert
# values to List[str]. We need that because in LDAP searches, attribute
# names and values are compared as case-insensitive, and Filter.match()
# expects the filter and values to be strings
self.objects: LDAPObjectStore = (
LDAPObjectStore()
) #: LDAP records set up to make searching better
self.tags: list[str] = (
tags if tags is not None else []
) #: used when filtering hooks to apply
self.controls: dict[str, Any] = {} #: can be used by hooks to store state
self.operational_attributes: set[str] = (
set()
) #: list of attributes that have to be specifically requested
for hook_func in hooks.get("post_objectstore_init", self.tags): # type: ignore[reportUnknownVariableType]
hook_func(self)
[docs] def convert_LDAPData(self, data: LDAPData) -> CILDAPData: # noqa: N802
"""
Convert an incoming ``LDAPData` dict (``Dict[str, List[bytes]``])
to a ``CILDAPData`` dict (``CaseInsensitiveDict[str, List[str]])``)
We need the data dict to have values as ``List[str]`` so that our
filtering works properly -- ``ldap_filter.Filter.match`` only works with
strings, not bytes.
Args:
data: the LDAPData dict to convert
Returns:
The convered CILDAPData dict.
"""
# We need self.objects to be a case insensitive Dict[str, List[str]]
# so that filtering works like it would in a real ldap serverindee
d: dict[str, Any] = deepcopy(data)
for key, value in d.items():
d[key] = [v.decode("utf8") for v in value]
return CILDAPData(d)
## Object store construction
[docs] def load_objects(self, filename: str) -> None:
"""
Load a list of LDAP records stored as JSON from a file into our internal
database. Use this when
setting up the data you will use to run your tests.
Note:
One caveat with this method vs.
:py:meth:`ObjectStore.register_objects` is that the records returned
by ``python-ldap`` are of type ``tuple[str, dict[str,
list[bytes]]]`` but JSON has no concept of ``bytes`` or ``tuple``.
Thus we will expect the LDAP records in the file to have type
``list[str, dict[str, list[str]]]`` and we will convert them to
``tuple[str, dict[str, list[bytes]]]`` before saving to
:py:attr:`raw_objects`
Args:
filename: the path to the JSON file to load
Raises:
ldap.ALREADY_EXISTS: there is already an object in our object store
with this dn
ldap.INVALID_DN_SYNTAX: one of the object DNs is not well formed
"""
for hook_func in hooks.get("pre_load_objects", self.tags):
hook_func(self, filename)
with Path(filename).open(encoding="utf-8") as fd:
objects = json.load(fd)
for obj in objects:
dn, data = obj
new_data: LDAPData = {}
for attr, value in data.items():
new_data[attr] = [entry.encode("utf-8") for entry in value]
self.register_object((dn, new_data))
for hook_func in hooks.get("post_load_objects", self.tags):
hook_func(self)
[docs] def register_objects(self, objs: list[LDAPRecord]) -> None:
"""
Load a list of LDAP records into our internal database. Use this when
setting up the data you will use to run your tests. Each record in the
list should be in exactly the format that ``python-ldap`` itself returns: a
2-tuple with dn as the first element and the attribute/value dict as the
second element.
Example:
Adding a several PosixAccount objects:
>>> obj = [
(
'uid=user,ou=mydept,o=myorg,c=country',
{
'cn': [b'Firstname User1'],
'uid': [b'user'],
'uidNumber': [b'123'],
'gidNumber': [b'456'],
'homeDirectory': [b'/home/user'],
'loginShell': [b'/bin/bash'],
'userPassword': [b'the password'],
'objectclass': [b'posixAccount', b'top']
}
),
(
'uid=user2,ou=mydept,o=myorg,c=country',
{
'cn': [b'Firstname User2'],
'uid': [b'user2'],
'uidNumber': [b'124'],
'gidNumber': [b'457'],
'homeDirectory': [b'/home/user1'],
'loginShell': [b'/bin/bash'],
'userPassword': [b'the password'],
'objectclass': [b'posixAccount', b'top']
}
)
]
>>> directory = ObjectStore()
>>> directory.register_objects(obj)
Args:
objs: A list of LDAP records as they would have been returned by
``ldap.ldapobject.LDAPObject.search_s()``. These are 2-tuples, where
the first element is the `dn` (a ``str``) and the second element is
a dict where the keys are ``str`` and the values are lists of
``bytes``.
Raises:
ldap.ALREADY_EXISTS: there is already an object in our object store
with this dn
ldap.INVALID_DN_SYNTAX: one of the object DNs is not well formed
TypeError: the LDAPData portion for an object was not of type
``Dict[str, List[bytes]]``
"""
for hook_func in hooks.get("pre_register_objects", self.tags):
hook_func(self, objs)
for obj in objs:
self.register_object(obj)
for hook_func in hooks.get("post_register_objects", self.tags):
hook_func(self)
[docs] def register_object(self, obj: LDAPRecord) -> None:
"""
Add an LDAP record our internal database. Use this to add a single
record when setting up the data you will use to run your tests. The
data should be in exactly the format that python-ldap itself returns: a
2-tuple with dn as the first element and the attribute/value dict as the
second element.
Example:
Adding a PosixAccount object:
>>> obj = (
'uid=user,ou=mydept,o=myorg,c=country',
{
'cn': [b'Firstname Lastname'],
'uid': [b'user'],
'uidNumber': [b'123'],
'gidNumber': [b'456'],
'homeDirectory': [b'/home/user'],
'loginShell': [b'/bin/bash'],
'userPassword': [b'the password']
'objectclass': [b'posixAccount', b'top']
}
)
>>> directory = ObjectStore()
>>> directory.register_object(obj)
Args:
obj: An LDAP record as it would have been returned by
``ldap.ldapobject.LDAPObject.search_s()``. This is a 2-tuple, where
the first element is the `dn` (a ``str``) and the second element is
a dict where the keys are ``str`` and the values are lists of
``bytes``.
Raises:
ldap.ALREADY_EXISTS: there is already an object in our object store
with this dn
ldap.INVALID_DN_SYNTAX: the DN is not well formed
TypeError: the LDAPData portion was not of type ``dict[str, list[bytes]]``
"""
for hook_func in hooks.get("pre_register_object", self.tags):
hook_func(self, obj)
if self.exists(obj[0]):
raise ldap.ALREADY_EXISTS({"desc": "Object already exists"}) # type: ignore[attr-defined]
self.set(obj[0], obj[1])
for hook_func in hooks.get("post_register_object", self.tags):
hook_func(self, obj)
# Helpers
def __is_default_filter(self, filterstr: str) -> bool:
"""
Test whether ``filterstr`` is the default filter string ``(objectClass=*)``.
Args:
filterstr: an LDAP filter string
Returns:
Returns ``True`` if ``filterstr`` is the default filter string
case-insensitive, ``False`` otherwise.
"""
return bool(self._DEFAULT_SEARCH_RE.search(filterstr))
def __check_bytes(self, value: Any) -> None:
"""
Check a value list, ensuring that we got ``List[bytes]`` and not another type.
Args:
value: the value to check
Raises:
TypeError: ``value`` is not a ``List[bytes]``
"""
for v in value:
if not isinstance(v, bytes):
msg = (
f"('Tuple_to_LDAPMod(): expected a byte string in the list', '{v}')"
)
raise TypeError(msg)
def __validate_dn(self, dn: str, operation: int = ldap.RES_ANY) -> None: # type: ignore[attr-defined]
"""
Validate that ``dn`` is a well formed DN.
Args:
dn: the DN to validate
Keyword Args:
operation: the ``msgtype`` to set on the exception
Raises:
ldap.INVALID_DN_SYNTAX: the dn was not well-formed
"""
if not ldap.dn.is_dn(dn): # type: ignore[attr-defined]
raise ldap.INVALID_DN_SYNTAX( # type: ignore[attr-defined]
{
"msgtype": operation,
"msgid": 3,
"result": 34,
"desc": "Invalid DN syntax",
"ctrls": [],
"info": "DN value invalid per syntax\n",
}
)
def __validate_LDAPRecord(self, obj: LDAPRecord) -> None: # noqa: N802
dn, data = obj
self.__validate_dn(dn)
for attr, value in data.items():
if not isinstance(attr, str):
msg = f"attributes must be of type str: '{attr!r}'"
raise TypeError(msg)
if not isinstance(value, list):
msg = f"values nust be of type List[bytes]: '{value!r}'"
raise TypeError(msg)
for v in value:
if not isinstance(v, bytes):
msg = f"values nust be of type List[bytes]: '{v!r}'"
raise TypeError(msg)
def __parse_filterstr(self, filterstr: str) -> Any:
try:
filt = Filter.parse(filterstr)
except ParseError as exc:
raise ldap.FILTER_ERROR( # type: ignore[attr-defined]
{
"result": -7,
"desc": "Bad search filter",
"errno": 35,
"ctrls": [],
"info": "Resource temporarily unavailable",
}
) from exc
return filt
def __filter_attributes(
self,
obj: LDAPData,
attrlist: list[str] | None = None,
include_operational_attributes: bool = False,
) -> LDAPData:
"""
Return just the attributes on ``obj`` named in ``attrlist``. If
``attrlist`` is ``None`` or "``*``" is in ``attrlist``, return all
attributes on ``obj``.
Any attribute named in :py:attr:`operational_attributes` will be omitted
unless specifically named ``attrlist``.
Note:
We return a :py:func:`copy.deepcopy` of the object, not the actual
object. This ensures that if the caller modifies the object they
don't update the objects in us unintentionally.
Args:
obj: the data for an LDAP object
Keyword Args:
attrlist: a list of attributes to include on ``obj``, removing
attributes not named
include_operational_attributes: include all operational attributes
even if they weren't requested in ``attrlist``
Returns:
A filtered version of ``obj`` with only the attributes named in
``attrlist``, omitting the operational attributes unless specifically
requested.
"""
if not attrlist:
attrlist = ["*"]
obj_attrs = set()
if attrlist and "*" in attrlist:
obj_attrs = set(obj.keys())
if not include_operational_attributes:
obj_attrs -= self.operational_attributes
if attrlist:
obj_attrs.update({attr for attr in attrlist if attr != "*"})
_obj_attrs: Attrlist = Attrlist()
for attr in obj_attrs:
_obj_attrs[attr] = attr
return {
_obj_attrs[attr]: deepcopy(value)
for attr, value in obj.items()
if attr in _obj_attrs
}
# Main methods
@property
def count(self):
return len(self.objects)
[docs] def exists(self, dn: str, validate: bool = True) -> bool:
"""
Test whether an object with dn ``dn`` exists.
Args:
dn: the dn of the object to look for
Keyword Args:
validate: if ``True``, validate that ``dn`` is a valid dn
Returns:
``True`` if the object exists, ``False`` otherwise.
"""
if validate:
self.__validate_dn(dn, ldap.RES_SEARCH_ENTRY) # type: ignore[attr-defined]
return dn in self.objects
[docs] def get(self, dn: str) -> LDAPData:
"""
Return all data for an object from our object store.
Args:
dn: the dn of the object to copy.
Raises:
ldap.NO_SUCH_OBJECT: no object with dn of ``dn`` exists in our object store
Returns:
The data for an LDAP object
"""
self.__validate_dn(dn, ldap.RES_SEARCH_ENTRY) # type: ignore[attr-defined]
try:
return self.raw_objects[dn]
except KeyError as exc:
raise ldap.NO_SUCH_OBJECT( # type: ignore[attr-defined]
{
"msgtype": 101,
"msgid": 4,
"result": 32,
"desc": "No such object",
"ctrls": [],
}
) from exc
[docs] def copy(self, dn: str) -> LDAPData:
"""
Return a copy of the data for an object from our object store.
Args:
dn: the dn of the object to copy.
Raises:
ldap.NO_SUCH_OBJECT: no object with dn of ``dn`` exists in our object store
Returns:
The data for an LDAP object
"""
self.__validate_dn(dn, ldap.RES_SEARCH_ENTRY) # type: ignore[attr-defined]
for hook_func in hooks.get("pre_copy", self.tags):
hook_func(self, dn)
data = deepcopy(self.get(dn))
for hook_func in hooks.get("post_copy", self.tags):
data = hook_func(self, data)
return data
def _set(self, dn: str, data: LDAPData) -> None:
"""
Add or update data for the object with dn ``dn``. This differs
from :py:meth:`set` in that no hooks will be applied.
Args:
dn: the dn of the object to copy.
data: the dict of data for this object
Raises:
ldap.INVALID_DN_SYNTAX: the DN is not well formed
TypeError: the LDAPData portion was not of type ``Dict[str, List[bytes]]``
"""
self.__validate_LDAPRecord((dn, data))
self.raw_objects[dn] = data
self.objects[dn] = self.convert_LDAPData(data)
[docs] def set(self, dn: str, data: LDAPData, bind_dn: str | None = None) -> None:
"""
Add or update data for the object with dn ``dn``.
Args:
dn: the dn of the object to copy.
data: the dict of data for this object
Keyword Args:
bind_dn: the dn of the user doing the set, if any
Raises:
ldap.INVALID_DN_SYNTAX: the DN is not well formed
TypeError: the LDAPData portion was not of type ``Dict[str, List[bytes]]``
"""
self.__validate_LDAPRecord((dn, data))
for hook_func in hooks.get("pre_set", self.tags):
hook_func(self, (dn, data), bind_dn)
self._set(dn, data)
self.raw_objects[dn] = data
self.objects[dn] = self.convert_LDAPData(data)
for hook_func in hooks.get("post_set", self.tags):
hook_func(self, (dn, data), bind_dn)
[docs] def update(self, dn: str, modlist: ModList, bind_dn: str | None = None) -> None: # noqa: PLR0912
"""
Modify the object with dn of ``dn`` using the modlist ``modlist``.
Each element in the list modlist should be a tuple of the form
``(mod_op: int, mod_type: str, mod_vals: bytes | List[bytes])``, where
``mod_op`` indicates the operation (one of :py:attr:`ldap.MOD_ADD`,
:py:attr:`ldap.MOD_DELETE`, or :py:attr:`ldap.MOD_REPLACE`, ``mod_type``
is a string indicating the attribute type name, and ``mod_vals`` is
either a bytes value or a list of bytes values to add, delete or
replace respectively. For the delete operation, ``mod_vals`` may be ``None``
indicating that all attributes are to be deleted.
Note:
:py:func:`ldap.modlist.modifyModlist` MAY be your friend here for
generating modlists. Do read the note in those docs about
:py:attr:`ldap.MOD_DELETE` / :py:attr:`ldap.MOD_ADD` vs.
:py:attr:`ldap.MOD_REPLACE` to see whether that will affect you poorly.
Example:
Here is an example of constructing a modlist for ``modify_s``:
>>> import ldap
>>> modlist = [
(ldap.MOD_ADD, 'mail', [b'user@example.com', b'user+foo@example.com']),
(ldap.MOD_REPLACE, 'cn', [b'My Name']),
(ldap.MOD_DELETE, 'gecos', None)
]
Args:
dn: the dn of the object to delete
modlist: a modlist suitable for ``modify_s``
Keyword Args:
bind_dn: the dn of the user doing the update, if any
Raises:
ldap.INVALID_DN_SYNTAX: the dn was not well-formed
ldap.NO_SUCH_OBJECT: no object with dn of ``dn`` exists in our
object store
ldap.TYPE_OR_VALUE_EXISTS: you tried to add an value to an
attribute, but it was already in the value list
ldap.INSUFFICIENT_ACCESS: you need to do a non-anonymous bind before
doing this
"""
def get_overlaps(source: list[bytes], other: list[bytes]) -> set[bytes]:
"""
Given two lists of bytes, return a set of the items that are in
both, ignoring the case of the values.
Args:
source: the source list
other: the list of values to compare with
Returns:
The list of items that are in both. These items will be lowercased.
"""
current = {v.lower() for v in source}
updates = {v.lower() for v in other}
return current.intersection(updates)
self.__validate_dn(dn, ldap.RES_MODIFY) # type: ignore[attr-defined]
for hook_func in hooks.get("pre_update", self.tags):
hook_func(self, dn, modlist, bind_dn)
# We want to use a deepcopy of our data here so we don't act directly
# on the data in the store before we do our self.set(); something may
# go wrong partway through the update (e.g. bad opcode) and we don't
# want partial updates to be reflected.
# may need to look at what the object used to look like in a "pre_set"
# hook.
# We can't use self.copy(dn) here, because hooks may muck with the data
# returned, thus this:
obj = deepcopy(self.get(dn))
for item in modlist:
op, key, value = item
if op not in (ldap.MOD_ADD, ldap.MOD_DELETE, ldap.MOD_REPLACE): # type: ignore[attr-defined]
raise ldap.PROTOCOL_ERROR( # type: ignore[attr-defined]
{
"msgtype": ldap.RES_MODIFY, # type: ignore[attr-defined]
"msgid": 4,
"result": 2,
"desc": "Protocol error",
"info": "unrecognized modify operation",
"ctrls": [],
}
)
if op == ldap.MOD_ADD: # type: ignore[attr-defined]
self.__check_bytes(value)
if key not in obj:
obj[key] = value
else:
# Enforce case-insensitive uniqueness in the value list
overlaps = get_overlaps(obj[key], value)
if not overlaps:
obj[key].extend(value)
else:
raise ldap.TYPE_OR_VALUE_EXISTS( # type: ignore[attr-defined]
{
"msgtype": ldap.RES_MODIFY, # type: ignore[attr-defined]
"msgid": 4,
"result": 20,
"desc": "Type or value exists",
"ctrls": [],
}
)
elif op == ldap.MOD_DELETE: # type: ignore[attr-defined]
if value is None:
# If value was None, delete the whole attribute
del obj[key]
else:
# otherwise just remove the values from value from obj[key]
self.__check_bytes(value)
overlaps = get_overlaps(obj[key], value)
obj[key] = [v for v in obj[key] if v.lower() not in overlaps]
elif op == ldap.MOD_REPLACE: # type: ignore[attr-defined]
self.__check_bytes(value)
obj[key] = value
self.set(dn, obj, bind_dn=bind_dn)
for hook_func in hooks.get("post_update", self.tags):
hook_func(self, obj, bind_dn)
[docs] def create(self, dn: str, modlist: AddModList, bind_dn: str | None = None) -> None:
"""
Create an object in our store with dn of ``dn``.
``modlist`` is similar the one passed to :py:meth:`modify_s`, except
that the operation integer is omitted from the tuples in ``modlist``.
You might want to look into sub-module ldap.modlist for generating the
modlist.
Example:
Here is an example of constructing a modlist for ``create``:
>>> modlist = [
('uid', [b'user']),
('gidNumber', [b'1000']),
('uidNumber', [b'1000']),
('loginShell', [b'/bin/bash']),
('homeDirectory', [b'/home/user']),
('userPassword', [b'the password']),
('cn', [b'My Name']),
('objectClass', [b'top', b'posixAccount']),
]
Args:
dn: the dn of the object to add
modlist: the add modlist
Keyword Args:
bind_dn: the dn of the user doing the create, if any
Raises:
ldap.INVALID_DN_SYNTAX: the dn was not well-formed
ldap.ALREADY_EXISTS: an object with dn of ``dn`` already exists in
our object store
ldap.INSUFFICIENT_ACCESS: you need to do a non-anonymous bind before
doing this
"""
self.__validate_dn(dn, ldap.RES_ADD) # type: ignore[attr-defined]
for hook_func in hooks.get("pre_create", self.tags):
hook_func(self, dn, modlist, bind_dn)
if self.exists(dn):
# TODO: probably this error dict is not complete
raise ldap.ALREADY_EXISTS( # type: ignore[attr-defined]
{"info": "", "desc": "Object already exists"}
)
entry = {}
for item in modlist:
attr, value = item
if not isinstance(attr, str):
msg = f"Tuple_to_LDAPMod() argument 1 must be str, not {type(attr)}"
raise TypeError(msg)
self.__check_bytes(value)
entry[attr] = value
self.set(dn, entry, bind_dn=bind_dn)
for hook_func in hooks.get("post_create", self.tags):
hook_func(self, (dn, entry), bind_dn)
[docs] def delete(self, dn: str, bind_dn: str | None = None) -> None:
"""
Delete an object from our objects directory.
Args:
dn: the dn of the object to delete
Keyword Args:
bind_dn: the dn of the user doing the delete, if any
Raises:
ldap.INVALID_DN_SYNTAX: the dn was not well-formed
"""
self.__validate_dn(dn, ldap.RES_DELETE) # type: ignore[attr-defined]
if not self.exists(dn):
raise ldap.NO_SUCH_OBJECT( # type: ignore[attr-defined]
{
"msgtype": 101,
"msgid": 4,
"result": 32,
"desc": "No such object",
"ctrls": [],
}
)
obj = self.copy(dn)
for hook_func in hooks.get("pre_delete", self.tags):
hook_func(self, (dn, obj), bind_dn=bind_dn)
del self.objects[dn]
del self.raw_objects[dn]
for hook_func in hooks.get("post_delete", self.tags):
hook_func(self, (dn, obj), bind_dn=bind_dn)
[docs] def search_base(
self,
base: str,
filterstr: str,
attrlist: list[str] | None = None,
) -> LDAPSearchResult:
"""
Do a :py:data:`ldap.SCOPE_BASE` search. Return the requested attributes
of the object in our object store with ``dn`` of ``base`` that also
matches ``filterstr``.
Note:
We return a :py:func:`copy.deepcopy` of the object, not the actual
object. This ensures that if the caller modifies the object they
don't update the objects in us unintentionally.
Note:
Some attributes are "operational" and are not returned by default
They must be named specifically if you want them. Example:
>>> store.search_base(
'thebasedn', '(objectclass=*)', ['*', 'createTimestamp'])
Args:
base: the dn of the object to return
filterstr: the ldap filter string
Keyword Args:
attrlist: the list of attributes to return for each object
Raises:
ldap.INVALID_DN_SYNTAX: ``base`` was not a well-formed DN
ldap.FILTER_ERROR: ``filterstr`` is has bad filter syntax
ldap.NO_SUCH_OBJECT: no object with dn of ``base`` exists in the
object store
Returns:
A list with one element -- the object with dn of ``base``.
"""
self.__validate_dn(base, ldap.RES_SEARCH_RESULT) # type: ignore[attr-defined]
data = self.get(base)
ci_data = self.objects[base]
results: LDAPSearchResult = []
if not self.__is_default_filter(filterstr):
filt = self.__parse_filterstr(filterstr)
if filt.match(ci_data):
# We need to do the filter against the the case-insensitive
# versions of our attribute names, because that's how LDAP
# works. Filter.match() will take care of doing
# case-insensitive value comparisons
results.append((base, self.__filter_attributes(data, attrlist)))
else:
results.append((base, self.__filter_attributes(data, attrlist)))
return results
[docs] def search_onelevel(
self,
base: str,
filterstr: str,
attrlist: list[str] | None = None,
) -> LDAPSearchResult:
"""
Do a :py:data:`ldap.SCOPE_ONELEVEL` search, for objects directly under
basedn ``base`` that match ``filterstr``.
Note:
We return a :py:func:`copy.deepcopy` of each object, not the actual
object. This ensures that if the caller modifies the object they
don't update the objects in us unintentionally.
Args:
base: the dn of the object to return
filterstr: the ldap filter string
Keyword Args:
attrlist: the list of attributes to return for each object
Raises:
ldap.INVALID_DN_SYNTAX: ``base`` was not a well-formed DN
ldap.FILTER_ERROR: ``filterstr`` is has bad filter syntax
Returns:
A list of LDAP objects -- 2-tuples of ``(dn, data)``.
"""
self.__validate_dn(base, ldap.RES_SEARCH_RESULT) # type: ignore[attr-defined]
basedn_parts = ldap.dn.explode_dn(base.lower(), flags=ldap.DN_FORMAT_LDAPV3) # type: ignore[attr-defined]
filt = None
if not self._DEFAULT_SEARCH_RE.search(filterstr):
filt = self.__parse_filterstr(filterstr)
results: LDAPSearchResult = []
for dn, data in self.objects.items():
dn_parts = ldap.dn.explode_dn(dn.lower(), flags=ldap.DN_FORMAT_LDAPV3) # type: ignore[attr-defined]
if dn_parts[1:] != basedn_parts:
continue
if filt:
if filt.match(data):
results.append(
(dn, self.__filter_attributes(self.raw_objects[dn], attrlist))
)
else:
results.append(
(dn, self.__filter_attributes(self.raw_objects[dn], attrlist))
)
return results
[docs] def search_subtree(
self,
base: str,
filterstr: str,
attrlist: list[str] | None = None,
include_operational_attributes: bool = False,
) -> LDAPSearchResult:
"""
Do a :py:data:`ldap.SCOPE_SUBTREE` search, for objects under basedn
``base`` that match ``filterstr``.
Args:
base: the dn of the object to return
filterstr: the ldap filter string
Note:
We return a :py:func:`copy.deepcopy` of each object, not the actual
object. This ensures that if the caller modifies the object they
don't update the objects in us unintentionally.
Keyword Args:
attrlist: the list of attributes to return for each object
include_operational_attributes: include all operational attributes even
if they are not named in ``attrlist``
Raises:
ldap.INVALID_DN_SYNTAX: ``base`` was not a well-formed DN
ldap.FILTER_ERROR: ``filterstr`` is has bad filter syntax
Returns:
A list of LDAP objects -- 2-tuples of ``(dn, data)``.
"""
self.__validate_dn(base, ldap.RES_SEARCH_RESULT) # type: ignore[attr-defined]
basedn_parts = ldap.dn.explode_dn(base.lower(), flags=ldap.DN_FORMAT_LDAPV3) # type: ignore[attr-defined]
filt = None
if not self._DEFAULT_SEARCH_RE.search(filterstr):
filt = self.__parse_filterstr(filterstr)
results: LDAPSearchResult = []
for dn, data in self.objects.items():
if basedn_parts:
# ``base`` was not the Root DN, so see if the object is under ``base``
dn_parts = ldap.dn.explode_dn(dn.lower(), flags=ldap.DN_FORMAT_LDAPV3) # type: ignore[attr-defined]
if dn_parts[-len(basedn_parts) :] != basedn_parts:
continue
if filt:
if filt.match(data):
results.append(
(
dn,
self.__filter_attributes(
self.raw_objects[dn],
attrlist,
include_operational_attributes=include_operational_attributes,
),
)
)
else:
results.append(
(
dn,
self.__filter_attributes(
self.raw_objects[dn],
attrlist,
include_operational_attributes=include_operational_attributes,
),
)
)
return results