patroni.utils module

Utilitary objects and functions that can be used throughout Patroni code.

var tzutc:

UTC time zone info object.

var logger:

logger of this module.

var USER_AGENT:

identifies the Patroni version, Python version, and the underlying platform.

var OCT_RE:

regular expression to match octal numbers, signed or unsigned.

var DEC_RE:

regular expression to match decimal numbers, signed or unsigned.

var HEX_RE:

regular expression to match hex strings, signed or unsigned.

var DBL_RE:

regular expression to match double precision numbers, signed or unsigned. Matches scientific notation too.

var WHITESPACE_RE:

regular expression to match whitespace characters

class patroni.utils.Retry(max_tries: int | None = 1, delay: float = 0.1, backoff: int = 2, max_jitter: float = 0.8, max_delay: int = 3600, sleep_func: ~typing.Callable[[int | float], None] = <function _sleep>, deadline: int | float | None = None, retry_exceptions: ~typing.Type[Exception] | ~typing.Tuple[~typing.Type[Exception], ...] = <class 'patroni.exceptions.PatroniException'>)View on GitHub

Bases: object

Helper for retrying a method in the face of retryable exceptions.

Variables:
  • max_tries – how many times to retry the command.

  • delay – initial delay between retry attempts.

  • backoff – backoff multiplier between retry attempts.

  • max_jitter – additional max jitter period to wait between retry attempts to avoid slamming the server.

  • max_delay – maximum delay in seconds, regardless of other backoff settings.

  • sleep_func – function used to introduce artificial delays.

  • deadline – timeout for operation retries.

  • retry_exceptions – single exception or tuple

__init__(max_tries: int | None = 1, delay: float = 0.1, backoff: int = 2, max_jitter: float = 0.8, max_delay: int = 3600, sleep_func: ~typing.Callable[[int | float], None] = <function _sleep>, deadline: int | float | None = None, retry_exceptions: ~typing.Type[Exception] | ~typing.Tuple[~typing.Type[Exception], ...] = <class 'patroni.exceptions.PatroniException'>) NoneView on GitHub

Create a Retry instance for retrying function calls.

Parameters:
  • max_tries – how many times to retry the command. -1 means infinite tries.

  • delay – initial delay between retry attempts.

  • backoff – backoff multiplier between retry attempts. Defaults to 2 for exponential backoff.

  • max_jitter – additional max jitter period to wait between retry attempts to avoid slamming the server.

  • max_delay – maximum delay in seconds, regardless of other backoff settings.

  • sleep_func – function used to introduce artificial delays.

  • deadline – timeout for operation retries.

  • retry_exceptions – single exception or tuple

copy() RetryView on GitHub

Return a clone of this retry manager.

ensure_deadline(timeout: float, raise_ex: Exception | None = None) boolView on GitHub

Calculates and checks the remaining deadline time.

Parameters:
  • timeout – if the deadline is smaller than the provided timeout value raise raise_ex exception.

  • raise_ex – the exception object that will be raised if the deadline is smaller than provided timeout.

Returns:

False if deadline is smaller than a provided timeout and raise_ex isn’t set. Otherwise True.

Raises:

Exception: raise_ex if calculated deadline is smaller than provided timeout.

reset() NoneView on GitHub

Reset the attempt counter, delay and stop time.

property sleeptime: float

Get next cycle sleep time.

It is based on the current delay plus a number up to max_jitter.

property stoptime: float

Get the current stop time.

update_delay() NoneView on GitHub

Set next cycle delay.

It will be the minimum value between:

  • current delay with backoff; or

  • max_delay.

exception patroni.utils.RetryFailedError(value: Any)View on GitHub

Bases: PatroniException

Maximum number of attempts exhausted in retry operation.

patroni.utils._sleep(interval: int | float) NoneView on GitHub

Wrap sleep().

Parameters:

interval – Delay execution for a given number of seconds. The argument may be a floating point number for subsecond precision.

patroni.utils.cluster_as_json(cluster: Cluster) Dict[str, Any]View on GitHub

Get a JSON representation of cluster.

Parameters:

cluster – the Cluster object to be parsed as JSON.

Returns:

JSON representation of cluster.

These are the possible keys in the returning object depending on the available information in cluster:

  • members: list of members in the cluster. Each value is a dict that may have the following keys:

    • name: the name of the host (unique in the cluster). The members list is sorted by this key;

    • role: leader, standby_leader, sync_standby, or replica;

    • state: stopping, stopped, stop failed, crashed, running, starting,

      start failed, restarting, restart failed, initializing new cluster, initdb failed, running custom bootstrap script, custom bootstrap failed, or creating replica;

    • api_url: REST API URL based on restapi->connect_address configuration;

    • host: PostgreSQL host based on postgresql->connect_address;

    • port: PostgreSQL port based on postgresql->connect_address;

    • timeline: PostgreSQL current timeline;

    • pending_restart: True if PostgreSQL is pending to be restarted;

    • scheduled_restart: scheduled restart timestamp, if any;

    • tags: any tags that were set for this member;

    • lag: replication lag, if applicable;

  • pause: True if cluster is in maintenance mode;

  • scheduled_switchover: if a switchover has been scheduled, then it contains this entry with these keys:

    • at: timestamp when switchover was scheduled to occur;

    • from: name of the member to be demoted;

    • to: name of the member to be promoted.

patroni.utils.compare_values(vartype: str, unit: str | None, settings_value: Any, config_value: Any) boolView on GitHub

Check if the value from pg_settings and from Patroni config are equivalent after parsing them as vartype.

Parameters:
  • vartype

    the target type to parse settings_value and config_value before comparing them. Accepts any among of the following (case sensitive):

    • bool: parse values using parse_bool(); or

    • integer: parse values using parse_int(); or

    • real: parse values using parse_real(); or

    • enum: parse values as lowercase strings; or

    • string: parse values as strings. This one is used by default if no valid value is passed as vartype.

  • unit – base unit to be used as argument when calling parse_int() or parse_real() for config_value.

  • settings_value – value to be compared with config_value.

  • config_value – value to be compared with settings_value.

Returns:

True if settings_value is equivalent to config_value when both are parsed as vartype.

Example:
>>> compare_values('enum', None, 'remote_write', 'REMOTE_WRITE')
True
>>> compare_values('string', None, 'remote_write', 'REMOTE_WRITE')
False
>>> compare_values('real', None, '1e-06', 0.000001)
True
>>> compare_values('integer', 'MB', '6GB', '6GB')
False
>>> compare_values('integer', None, '6GB', '6GB')
False
>>> compare_values('integer', '16384kB', '64', ' 0x400 MB ')
True
>>> compare_values('integer', '2MB', 524288, '1TB')
True
>>> compare_values('integer', 'MB', 1048576, '1TB')
True
>>> compare_values('integer', 'kB', 4098, '4097.5kB')
True
patroni.utils.convert_int_from_base_unit(base_value: int, base_unit: str | None) str | NoneView on GitHub

Convert an integer value in some base unit to a human-friendly unit.

The output unit is chosen so that it’s the greatest unit that can represent the value without loss.

Parameters:
  • base_value – value to be converted from a base unit

  • base_unit

    unit of value. Should be one of the base units (case sensitive):

    • For space: B, kB, MB;

    • For time: ms, s, min.

Returns:

str value representing base_value converted from base_unit to the greatest possible human-friendly unit, or None if conversion failed.

Example:
>>> convert_int_from_base_unit(1024, 'kB')
'1MB'
>>> convert_int_from_base_unit(1025, 'kB')
'1025kB'
>>> convert_int_from_base_unit(4, '256MB')
'1GB'
>>> convert_int_from_base_unit(4, '256 MB') is None
True
>>> convert_int_from_base_unit(1024, 'KB') is None
True
patroni.utils.convert_real_from_base_unit(base_value: float, base_unit: str | None) str | NoneView on GitHub

Convert an floating-point value in some base unit to a human-friendly unit.

Same as convert_int_from_base_unit(), except we have to do the math a bit differently, and there’s a possibility that we don’t find any exact divisor.

Parameters:
  • base_value – value to be converted from a base unit

  • base_unit

    unit of value. Should be one of the base units (case sensitive):

    • For space: B, kB, MB;

    • For time: ms, s, min.

Returns:

str value representing base_value converted from base_unit to the greatest possible human-friendly unit, or None if conversion failed.

Example:
>>> convert_real_from_base_unit(5, 'ms')
'5ms'
>>> convert_real_from_base_unit(2.5, 'ms')
'2500us'
>>> convert_real_from_base_unit(4.0, '256MB')
'1GB'
>>> convert_real_from_base_unit(4.0, '256 MB') is None
True
patroni.utils.convert_to_base_unit(value: int | float, unit: str, base_unit: str | None) int | float | NoneView on GitHub

Convert value as a unit of compute information or time to base_unit.

Parameters:
  • value – value to be converted to the base unit.

  • unit

    unit of value. Accepts these units (case sensitive):

    • For space: B, kB, MB, GB, or TB;

    • For time: d, h, min, s, ms, or us.

  • base_unit

    target unit in the conversion. May contain the target unit with an associated value, e.g 512MB. Accepts these units (case sensitive):

    • For space: B, kB, or MB;

    • For time: ms, s, or min.

Returns:

value in unit converted to base_unit. Returns None if unit or base_unit is invalid.

Example:
>>> convert_to_base_unit(1, 'GB', '256MB')
4
>>> convert_to_base_unit(1, 'GB', 'MB')
1024
>>> convert_to_base_unit(1, 'gB', '512MB') is None
True
>>> convert_to_base_unit(1, 'GB', '512 MB') is None
True
patroni.utils.data_directory_is_empty(data_dir: str) boolView on GitHub

Check if a PostgreSQL data directory is empty.

Note

In non-Windows environments data_dir is also considered empty if it only contains hidden files and/or lost+found directory.

Parameters:

data_dir – the PostgreSQL data directory to be checked.

Returns:

True if data_dir is empty.

patroni.utils.deep_compare(obj1: Dict[Any, Any | Dict[Any, Any]], obj2: Dict[Any, Any | Dict[Any, Any]]) boolView on GitHub

Recursively compare two dictionaries to check if they are equal in terms of keys and values.

Note

Values are compared based on their string representation.

Parameters:
  • obj1 – dictionary to be compared with obj2.

  • obj2 – dictionary to be compared with obj1.

Returns:

True if all keys and values match between the two dictionaries.

Example:
>>> deep_compare({'1': None}, {})
False
>>> deep_compare({'1': {}}, {'1': None})
False
>>> deep_compare({'1': [1]}, {'1': [2]})
False
>>> deep_compare({'1': 2}, {'1': '2'})
True
>>> deep_compare({'1': {'2': [3, 4]}}, {'1': {'2': [3, 4]}})
True
patroni.utils.enable_keepalive(sock: socket, timeout: int, idle: int, cnt: int = 3) NoneView on GitHub

Enable keepalive for sock.

Will set socket options depending on the platform, as per return of keepalive_socket_options().

Note

Value for TCP_KEEPINTVL will be calculated through keepalive_intvl() based on timeout, idle, and cnt.

Parameters:
  • sock – the socket for which keepalive will be enabled.

  • timeout – value for TCP_USER_TIMEOUT.

  • idle – value for TCP_KEEPIDLE.

  • cnt – value for TCP_KEEPCNT.

Returns:

output of ioctl() if we are on Windows, nothing otherwise.

patroni.utils.get_conversion_table(base_unit: str) Dict[str, Dict[str, int | float]]View on GitHub

Get conversion table for the specified base unit.

If no conversion table exists for the passed unit, return an empty OrderedDict.

Parameters:

base_unit – unit to choose the conversion table for.

Returns:

OrderedDict object.

patroni.utils.get_major_version(bin_dir: str | None = None, bin_name: str = 'postgres') strView on GitHub

Get the major version of PostgreSQL.

It is based on the output of postgres --version.

Parameters:
  • bin_dir – path to the PostgreSQL binaries directory. If None or an empty string, it will use the first bin_name binary that is found by the subprocess in the PATH.

  • bin_name – name of the postgres binary to call (postgres by default)

Returns:

the PostgreSQL major version.

Raises:

PatroniException: if the postgres binary call failed due to OSError.

Example:
  • Returns 9.6 for PostgreSQL 9.6.24

  • Returns 15 for PostgreSQL 15.2

patroni.utils.is_subpath(d1: str, d2: str) boolView on GitHub

Check if the file system path d2 is contained within d1 after resolving symbolic links.

Note

It will not check if the paths actually exist, it will only expand the paths and resolve any symbolic links that happen to be found.

Parameters:
  • d1 – path to a directory.

  • d2 – path to be checked if is within d1.

Returns:

True if d1 is a subpath of d2.

patroni.utils.iter_response_objects(response: HTTPResponse) Iterator[Dict[str, Any]]View on GitHub

Iterate over the chunks of a HTTPResponse and yield each JSON document that is found.

Parameters:

response – the HTTP response from which JSON documents will be retrieved.

Yields:

current JSON document.

patroni.utils.keepalive_intvl(timeout: int, idle: int, cnt: int = 3) intView on GitHub

Calculate the value to be used as TCP_KEEPINTVL based on timeout, idle, and cnt.

Parameters:
  • timeout – value for TCP_USER_TIMEOUT.

  • idle – value for TCP_KEEPIDLE.

  • cnt – value for TCP_KEEPCNT.

Returns:

the value to be used as TCP_KEEPINTVL.

patroni.utils.keepalive_socket_options(timeout: int, idle: int, cnt: int = 3) Iterator[Tuple[int, int, int]]View on GitHub

Get all keepalive related options to be set in a socket.

Parameters:
  • timeout – value for TCP_USER_TIMEOUT.

  • idle – value for TCP_KEEPIDLE.

  • cnt – value for TCP_KEEPCNT.

Yields:

all keepalive related socket options to be set. The first item in the tuple is the protocol, the second item is the option, and the third item is the value to be used. The return values depend on the platform:

  • Windows:
    • SO_KEEPALIVE.

  • Linux:
    • SO_KEEPALIVE;

    • TCP_USER_TIMEOUT;

    • TCP_KEEPIDLE;

    • TCP_KEEPINTVL;

    • TCP_KEEPCNT.

  • MacOS:
    • SO_KEEPALIVE;

    • TCP_KEEPIDLE;

    • TCP_KEEPINTVL;

    • TCP_KEEPCNT.

patroni.utils.maybe_convert_from_base_unit(base_value: str, vartype: str, base_unit: str | None) strView on GitHub

Try to convert integer or real value in a base unit to a human-readable unit.

Value is passed as a string. If parsing or subsequent conversion fails, the original value is returned.

Parameters:
  • base_value – value to be converted from a base unit.

  • vartype – the target type to parse base_value before converting (integer or real is expected, any other type results in return value being equal to the base_value string).

  • base_unit

    unit of value. Should be one of the base units (case sensitive):

    • For space: B, kB, MB;

    • For time: ms, s, min.

Returns:

str value representing base_value converted from base_unit to the greatest possible human-friendly unit, or base_value string if conversion failed.

Example:
>>> maybe_convert_from_base_unit('5', 'integer', 'ms')
'5ms'
>>> maybe_convert_from_base_unit('4.2', 'real', 'ms')
'4200us'
>>> maybe_convert_from_base_unit('on', 'bool', None)
'on'
>>> maybe_convert_from_base_unit('', 'integer', '256MB')
''
patroni.utils.parse_bool(value: Any) bool | NoneView on GitHub

Parse a given value to a bool object.

Note

The parsing is case-insensitive, and takes into consideration these values:
  • on, true, yes, and 1 as True.

  • off, false, no, and 0 as False.

Parameters:

value – value to be parsed to bool.

Returns:

the parsed value. If not able to parse, returns None.

Example:
>>> parse_bool(1)
True
>>> parse_bool('off')
False
>>> parse_bool('foo')
patroni.utils.parse_int(value: Any, base_unit: str | None = None) int | NoneView on GitHub

Parse value as an int.

Parameters:
  • value – any value that can be handled either by strtol() or strtod(). If value contains a unit, then base_unit must be given.

  • base_unit – an optional base unit to convert value through convert_to_base_unit(). Not used if value does not contain a unit.

Returns:

the parsed value, if able to parse. Otherwise returns None.

Example:
>>> parse_int('1') == 1
True
>>> parse_int(' 0x400 MB ', '16384kB') == 64
True
>>> parse_int('1MB', 'kB') == 1024
True
>>> parse_int('1000 ms', 's') == 1
True
>>> parse_int('1TB', 'GB') is None
True
>>> parse_int(50, None) == 50
True
>>> parse_int("51", None) == 51
True
>>> parse_int("nonsense", None) == None
True
>>> parse_int("nonsense", "kB") == None
True
>>> parse_int("nonsense") == None
True
>>> parse_int(0) == 0
True
>>> parse_int('6GB', '16MB') == 384
True
>>> parse_int('4097.4kB', 'kB') == 4097
True
>>> parse_int('4097.5kB', 'kB') == 4098
True
patroni.utils.parse_real(value: Any, base_unit: str | None = None) float | NoneView on GitHub

Parse value as a float.

Parameters:
  • value – any value that can be handled by strtod(). If value contains a unit, then base_unit must be given.

  • base_unit – an optional base unit to convert value through convert_to_base_unit(). Not used if value does not contain a unit.

Returns:

the parsed value, if able to parse. Otherwise returns None.

Example:
>>> parse_real(' +0.0005 ') == 0.0005
True
>>> parse_real('0.0005ms', 'ms') == 0.0
True
>>> parse_real('0.00051ms', 'ms') == 0.001
True
patroni.utils.patch_config(config: Dict[Any, Any | Dict[Any, Any]], data: Dict[Any, Any | Dict[Any, Any]]) boolView on GitHub

Update and append to dictionary config from overrides in data.

Note

  • If the value of a given key in data is None, then the key is removed from config;

  • If a key is present in data but not in config, the key with the corresponding value is added to config

  • For keys that are present on both sides it will compare the string representation of the corresponding values, if the comparison doesn’t match override the value

Parameters:
  • config – configuration to be patched.

  • data – new configuration values to patch config with.

Returns:

True if config was changed.

patroni.utils.polling_loop(timeout: int | float, interval: int | float = 1) Iterator[int]View on GitHub

Return an iterator that returns values every interval seconds until timeout has passed.

Note

Timeout is measured from start of iteration.

Parameters:
  • timeout – for how long (in seconds) from now it should keep returning values.

  • interval – for how long to sleep before returning a new value.

Yields:

current iteration counter, starting from 0.

patroni.utils.read_stripped(file_path: str) Iterator[str]View on GitHub

Iterate over stripped lines in the given file.

Parameters:

file_path – path to the file to read from

Yields:

each line from the given file stripped

patroni.utils.split_host_port(value: str, default_port: int | None) Tuple[str, int]View on GitHub

Extract host(s) and port from value.

Parameters:
  • value

    string from where host(s) and port will be extracted. Accepts either of these formats:

    • host:port; or

    • host1,host2,...,hostn:port.

    Each host portion of value can be either:

    • A FQDN; or

    • An IPv4 address; or

    • An IPv6 address, with or without square brackets.

  • default_port – if no port can be found in param, use default_port instead.

Returns:

the first item is composed of a CSV list of hosts from value, and the second item is either the port from value or default_port.

Example:
>>> split_host_port('127.0.0.1', 5432)
('127.0.0.1', 5432)
>>> split_host_port('127.0.0.1:5400', 5432)
('127.0.0.1', 5400)
>>> split_host_port('127.0.0.1,192.168.0.101:5400', 5432)
('127.0.0.1,192.168.0.101', 5400)
>>> split_host_port('127.0.0.1,www.mydomain.com,[fe80:0:0:0:213:72ff:fe3c:21bf], 0:0:0:0:0:0:0:0:5400', 5432)
('127.0.0.1,www.mydomain.com,fe80:0:0:0:213:72ff:fe3c:21bf,0:0:0:0:0:0:0:0', 5400)
patroni.utils.strtod(value: Any) Tuple[float | None, str]View on GitHub

Extract the double precision part from the beginning of a string that reprensents a configuration value.

As most as possible close equivalent of strtod(3) C function, which is used by postgres to parse parameter values.

Parameters:

value – any value from which we want to extract a double precision.

Returns:

the first item is the extracted double precision from value, and the second item is the remaining string of value. If not able to match a double precision in value, then the first item will be None, and the second item will be the original value.

Example:
>>> strtod(' A ') == (None, 'A')
True
>>> strtod('1 A ') == (1.0, ' A')
True
>>> strtod('1.5A') == (1.5, 'A')
True
>>> strtod('8.325e-10A B C') == (8.325e-10, 'A B C')
True
patroni.utils.strtol(value: Any, strict: bool | None = True) Tuple[int | None, str]View on GitHub

Extract the long integer part from the beginning of a string that represents a configuration value.

As most as possible close equivalent of strtol(3) C function (with base=0), which is used by postgres to parse parameter values.

Takes into consideration numbers represented either as hex, octal or decimal formats.

Parameters:
  • value – any value from which we want to extract a long integer.

  • strict – dictates how the first item in the returning tuple is set when strtol() is not able to find a long integer in value. If strict is True, then the first item will be None, else it will be 1.

Returns:

the first item is the extracted long integer from value, and the second item is the remaining string of value. If not able to match a long integer in value, then the first item will be either None or 1 (depending on strict argument), and the second item will be the original value.

Example:
>>> strtol(0) == (0, '')
True
>>> strtol(1) == (1, '')
True
>>> strtol(9) == (9, '')
True
>>> strtol(' +0x400MB') == (1024, 'MB')
True
>>> strtol(' -070d') == (-56, 'd')
True
>>> strtol(' d ') == (None, 'd')
True
>>> strtol(' 1 d ') == (1, ' d')
True
>>> strtol('9s', False) == (9, 's')
True
>>> strtol(' s ', False) == (1, 's')
True
patroni.utils.unquote(string: str) strView on GitHub

Unquote a fully quoted string.

Parameters:

string – The string to be checked for quoting.

Returns:

The string with quotes removed, if it is a fully quoted single string, or the original string if quoting is not detected, or unquoting was not possible.

Examples:

A string with quotes will have those quotes removed

>>> unquote('"a quoted string"')
'a quoted string'

A string with multiple quotes will be returned as is

>>> unquote('"a multi" "quoted string"')
'"a multi" "quoted string"'

So will a string with unbalanced quotes

>>> unquote('unbalanced "quoted string')
'unbalanced "quoted string'
patroni.utils.uri(proto: str, netloc: List[str] | Tuple[str, int | str] | str, path: str | None = '', user: str | None = None) strView on GitHub

Construct URI from given arguments.

Parameters:
  • proto – the URI protocol.

  • netloc

    the URI host(s) and port. Can be specified in either way among

    • A list or tuple. The second item should be a port, and the first item should be composed of

      hosts in either of these formats:

      • host; or.

      • host1,host2,...,hostn.

    • A str in either of these formats:

      • host:port; or

      • host1,host2,...,hostn:port.

    In all cases, each host portion of netloc can be either:

    • An FQDN; or

    • An IPv4 address; or

    • An IPv6 address, with or without square brackets.

  • path – the URI path.

  • user – the authenticating user, if any.

Returns:

constructed URI.

patroni.utils.validate_directory(d: str, msg: str = '{} {}') NoneView on GitHub

Ensure directory exists and is writable.

Note

If the directory does not exist, validate_directory() will attempt to create it.

Parameters:
  • d – the directory to be checked.

  • msg

    a message to be thrown when raising PatroniException, if any issue is faced. It must contain 2 placeholders to be used by format():

    • The first placeholder will be replaced with path d;

    • The second placeholder will be replaced with the error condition.

Raises:

PatroniException: if any issue is observed while validating d. Can be thrown if:

  • d did not exist, and validate_directory() was not able to create it; or

  • d is an existing directory, but Patroni is not able to write to that directory; or

  • d is an existing file, not a directory.