patroni.postgresql.slots module

Replication slot handling.

Provides classes for the creation, monitoring, management and synchronisation of PostgreSQL replication slots.

class patroni.postgresql.slots.SlotsAdvanceThread(slots_handler: SlotsHandler)View on GitHub

Bases: Thread

Daemon process :class:Thread object for advancing logical replication slots on replicas.

This ensures that slot advancing queries sent to postgres do not block the main loop.

__init__(slots_handler: SlotsHandler) NoneView on GitHub

Create and start a new thread for handling slot advance queries.

Parameters:

slots_handler – The calling class instance for reference to slot information attributes.

on_promote() NoneView on GitHub

Reset state of the daemon.

run() NoneView on GitHub

Thread main loop entrypoint.

Note

Thread will wait until a sync is scheduled from outside, normally triggered during the HA loop or a wakeup call.

schedule(advance_slots: Dict[str, Dict[str, int]]) Tuple[bool, List[str]]View on GitHub

Trigger a synchronisation of slots.

This is the main entrypoint for Patroni HA loop wakeup call.

Parameters:

advance_slots – dictionary containing slots that need to be advanced

Returns:

tuple of failure status and a list of slots to be copied

sync_slot(cur: cursor | Cursor[Any], database: str, slot: str, lsn: int) NoneView on GitHub

Execute a pg_replication_slot_advance query and store success for scheduled synchronisation task.

Parameters:
  • cur – database connection cursor.

  • database – name of the database associated with the slot.

  • slot – name of the slot to be synchronised.

  • lsn – last known LSN position

sync_slots() NoneView on GitHub

Synchronise slots for all scheduled databases.

sync_slots_in_database(database: str, slots: List[str]) NoneView on GitHub

Synchronise slots for a single database.

Parameters:
  • database – name of the database.

  • slots – list of slot names to synchronise.

class patroni.postgresql.slots.SlotsHandler(postgresql: Postgresql)View on GitHub

Bases: object

Handler for managing and storing information on replication slots in PostgreSQL.

Variables:
  • pg_replslot_dir – system location path of the PostgreSQL replication slots.

  • _logical_slots_processing_queue – yet to be processed logical replication slots on the primary

__init__(postgresql: Postgresql) NoneView on GitHub

Create an instance with storage attributes for replication slots and schedule the first synchronisation.

Parameters:

postgresql – Calling class instance providing interface to PostgreSQL.

static _copy_items(src: Dict[str, Any], dst: Dict[str, Any], keys: Collection[str] | None = None) NoneView on GitHub

Select values from src dictionary to update in dst dictionary for optional supplied keys.

Parameters:
  • src – source dictionary that keys will be looked up from.

  • dst – destination dictionary to be updated.

  • keys – optional list of keys to be looked up in the source dictionary.

_drop_incorrect_slots(cluster: Cluster, slots: Dict[str, Any]) NoneView on GitHub

Compare required slots and configured as permanent slots with those found, dropping extraneous ones.

Note

Slots that are not contained in slots will be dropped. Slots can be filtered out with ignore_slots configuration.

Slots that have matching names but do not match attributes in slots will also be dropped.

Parameters:
  • cluster – cluster state information object.

  • slots – dictionary of desired slot names as keys with slot attributes as a dictionary value, if known.

_ensure_logical_slots_primary(slots: Dict[str, Any]) NoneView on GitHub

Create any missing logical replication slots on the primary.

If the logical slot already exists, copy state information into the replication slots structure stored in the class instance.

Parameters:

slots – Slots that should exist are supplied in a dictionary, mapping slot name to any attributes. The method will only consider slots that have a value that is a dictionary with a key type with a value that is logical.

_ensure_logical_slots_replica(slots: Dict[str, Any]) List[str]View on GitHub

Update logical slots on replicas.

If the logical slot already exists, copy state information into the replication slots structure stored in the class instance. Slots that exist are also advanced if their confirmed_flush_lsn is greater than the stored state of the slot.

As logical slots can only be created when the primary is available, pass the list of slots that need to be copied back to the caller. They will be created on replicas with SlotsHandler.copy_logical_slots().

Parameters:

slots – A dictionary mapping slot name to slot attributes. This method only considers a slot if the value is a dictionary with the key type and a value of logical.

Returns:

list of slots to be copied from the primary.

_ensure_physical_slots(slots: Dict[str, Any]) NoneView on GitHub

Create or advance physical replication slots.

Any failures are logged and do not interrupt creation of all slots.

Parameters:

slots – A dictionary mapping slot name to slot attributes. This method only considers a slot if the value is a dictionary with the key type and a value of physical.

_get_leader_connection_cursor(leader: Leader) Iterator[cursor | Cursor[Any]]View on GitHub

Create a new database connection to the leader.

Note

Uses rewind user credentials because it has enough permissions to read files from PGDATA. Sets the options connect_timeout to 3 and statement_timeout to 2000.

Parameters:

leader – object with information on the leader

Yields:

connection cursor object, note implementation varies depending on version of psycopg.

_query(sql: str, *params: Any) List[Tuple[Any, ...]]View on GitHub

Helper method for Postgresql.query().

Parameters:
  • sql – SQL statement to execute.

  • params – parameters to pass through to Postgresql.query().

Returns:

query response.

_ready_logical_slots(primary_physical_catalog_xmin: int | None = None) NoneView on GitHub

Ready logical slots by comparing primary physical slot catalog_xmin to logical catalog_xmin.

The logical slot on a replica is safe to use when the physical replica slot on the primary:

  1. has a nonzero/non-null catalog_xmin represented by primary_physical_xmin.

  2. has a catalog_xmin that is not newer (greater) than the catalog_xmin of any slot on the standby

  3. overtook the catalog_xmin of remembered values of logical slots on the primary.

Parameters:

primary_physical_catalog_xmin – is the value retrieved from pg_catalog.pg_get_replication_slots() for the physical replication slot on the primary.

_update_pending_logical_slot_primary(slots: Dict[str, Any], catalog_xmin: int | None = None) boolView on GitHub

Store pending logical slot information for catalog_xmin on the primary.

Remember catalog_xmin of logical slots on the primary when catalog_xmin of the physical slot became valid. Logical slots on replica will be safe to use after promote when catalog_xmin of the physical slot overtakes these values.

Parameters:
  • slots – dictionary of slot information from the primary

  • catalog_xmincatalog_xmin of the physical slot used by this replica to stream changes from primary.

Returns:

False if any issue was faced while processing, True otherwise.

check_logical_slots_readiness(cluster: Cluster, tags: Tags) boolView on GitHub

Determine whether all known logical slots are synchronised from the leader.

  1. Retrieve the current catalog_xmin value for the physical slot from the cluster leader, and

  2. using previously stored list of “unready” logical slots, those which have yet to be checked hence have no stored slot attributes,

  3. store logical slot catalog_xmin when the physical slot catalog_xmin becomes valid.

Parameters:
  • cluster – object containing stateful information for the cluster.

  • tags – reference to an object implementing Tags interface.

Returns:

False if any issue while checking logical slots readiness, True otherwise.

copy_logical_slots(cluster: Cluster, tags: Tags, create_slots: List[str]) NoneView on GitHub

Create logical replication slots on standby nodes.

Parameters:
  • cluster – object containing stateful information for the cluster.

  • tags – reference to an object implementing Tags interface.

  • create_slots – list of slot names to copy from the primary.

drop_replication_slot(name: str) Tuple[bool, bool]View on GitHub

Drop a named slot from Postgres.

Parameters:

name – name of the slot to be dropped.

Returns:

a tuple of active and dropped. active is True if the slot is active, dropped is True if the slot was successfully dropped. If the slot was not found return False for both.

get_local_connection_cursor(**kwargs: Any) Iterator[cursor | Cursor[Any]]View on GitHub

Create a new database connection to local server.

Create a non-blocking connection cursor to avoid the situation where an execution of the query of pg_replication_slot_advance takes longer than the timeout on a HA loop, which could cause a false failure state.

Parameters:

kwargs – Any keyword arguments to pass to psycopg.connect().

Yields:

connection cursor object, note implementation varies depending on version of psycopg.

ignore_replication_slot(cluster: Cluster, name: str) boolView on GitHub

Check if slot name should not be managed by Patroni.

Parameters:
  • cluster – cluster state information object.

  • name – name of the slot to ignore

Returns:

True if slot name matches any slot specified in ignore_slots configuration, otherwise will pass through and return result of AbstractMPPHandler.ignore_replication_slot().

load_replication_slots() NoneView on GitHub

Query replication slot information from the database and store it for processing by other tasks.

Note

Only supported from PostgreSQL version 9.4 onwards.

Store replication slot name, type, plugin, database and datoid. If PostgreSQL version is 10 or newer also store catalog_xmin and confirmed_flush_lsn.

When using logical slots, store information separately for slot synchronisation on replica nodes.

on_promote() NoneView on GitHub

Entry point from HA cycle used when a standby node is to be promoted to primary.

Note

If logical replication slot synchronisation is enabled then slot advancement will be triggered. If any logical slots that were copied are yet to be confirmed as ready a warning message will be logged.

process_permanent_slots(slots: List[Dict[str, Any]]) Dict[str, int]View on GitHub

Process replication slot information from the host and prepare information used in subsequent cluster tasks.

Note

This methods solves three problems.

The cluster_info_query from :class:Postgresql is executed every HA loop and returns information about all replication slots that exists on the current host.

Based on this information perform the following actions:

  1. For the primary we want to expose to DCS permanent logical slots, therefore build (and return) a dict that maps permanent logical slot names to confirmed_flush_lsn.

  2. detect if one of the previously known permanent slots is missing and schedule resync.

  3. Update the local cache with the fresh catalog_xmin and confirmed_flush_lsn for every known slot.

This info is used when performing the check of logical slot readiness on standbys.

Parameters:

slots – replication slot information that exists on the current host.

Returns:

dictionary of logical slot names to confirmed_flush_lsn.

schedule(value: bool | None = None) NoneView on GitHub

Schedule the loading of slot information from the database.

Parameters:

value – the optional value can be used to unschedule if set to False or force it to be True. If it is omitted the value will be True if this PostgreSQL node supports slot replication.

schedule_advance_slots(slots: Dict[str, Dict[str, int]]) Tuple[bool, List[str]]View on GitHub

Wrapper to ensure slots advance daemon thread is started if not already.

Parameters:

slots – dictionary containing slot information.

Returns:

tuple with the result of the scheduling of slot advancement: failed and list of slots to copy.

sync_replication_slots(cluster: Cluster, tags: Tags) List[str]View on GitHub

During the HA loop read, check and alter replication slots found in the cluster.

Read physical and logical slots from pg_replication_slots, then compare to those configured in the DCS. Drop any slots that do not match those required by configuration and are not configured as permanent. Create any missing physical slots, or advance their position according to feedback stored in DCS. If we are the primary then create logical slots, otherwise if logical slots are known and active create them on replica nodes by copying slot files from the primary.

Parameters:
  • cluster – object containing stateful information for the cluster.

  • tags – reference to an object implementing Tags interface.

Returns:

list of logical replication slots names that should be copied from the primary.

patroni.postgresql.slots.compare_slots(s1: Dict[str, Any], s2: Dict[str, Any], dbid: str = 'database') boolView on GitHub

Compare 2 replication slot objects for equality.

..note ::

If the first argument is a physical replication slot then only the type of the second slot is compared. If the first argument is another type (e.g. logical) then dbid and plugin are compared.

Parameters:
  • s1 – First slot dictionary to be compared.

  • s2 – Second slot dictionary to be compared.

  • dbid – Optional attribute to be compared when comparing logical replication slots.

Returns:

True if the slot type of s1 and s2 is matches, and the type of s1 is physical, OR the types match AND the dbid and plugin attributes are equal.