Skip to main content

PostgreSQL Event Store

Currently, Disintegrate exclusively supports PostgreSQL database implementation. This section offers insights into how Disintegrate interacts with PostgreSQL, focusing on managing application changes and handling data migrations.

Postgres Database Schema

Disintegrate automatically generates the following tables when a PostgreSQL event source is created:

  • Event: Stores all events within the event stream.

    • event_id: Global identifier of the event.
    • event_type: Type of the event.
    • payload: Contains the event's payload.
    • inserted_at: Timestamp indicating when the event was written (in UTC time).
    • "Domain identifier" columns: Automatically created by the library when a field in the Event is marked as #[id], used for indexing and query optimization.
  • Event Listener: Maintains records of the last event processed for each listener:

    • id: Identifier of the event listener.
    • last_processed_id: ID of the last event processed by the event listener.
    • updated_at: Timestamp indicating the last time the table was updated.
  • Snapshot: Stores stream query payloads to speed up loading:

    • id: Identifier of the stream query.
    • name: Name of the stream query.
    • query: String representation of the query.
    • version: Last event ID processed by the stream query.
    • payload: Payload of the stream query.
    • inserted_at: Timestamp indicating the last time the row was inserted.

Append Events

The event stream append API requires three arguments:

  • A list of new events to append.
  • A stream query used to detect whether conflicting events have been appended to the event store.
  • The last_event_id obtained when the decision was made.

The append operation runs inside a PostgreSQL transaction using the SERIALIZABLE isolation level. Before inserting new events, the event table is queried using the provided StreamQuery and last_event_id to determine whether any events have been appended that would invalidate the state on which the Decision was based.

If such events are found, the operation fails with a concurrency error, indicating that the Decision was made on stale state. If the transaction commits successfully, it guarantees that no conflicting events occurred and that the new events were safely appended to the event store.

Query Events

The query API requires a StreamQuery to fetch data from the event table, enabling the search and filtering of events based on specified criteria. Domain identifiers are stored in a dedicated column, and indexed to optimize query operations. The library autonomously adds domain identifier columns when an Event field is tagged with the #[id] attribute. To properly manage the addition and removal of domain identifiers, consult the data migration section.

Data Migration

Manual data migration is may be needed when the following changes are made to the event structure:

  1. Adding a New Domain Identifier: If you want to search old event with this new id, a data migration is required to populate the new id column in the existing events.
  2. Declaring an Existing Field as a Domain Identifier: Migration is necessary to populate this identifier for old events. Even if the event payload contains the domain identifier value, Disintegrate does not automatically populate the domain identifier column for the already persisted events. To address this, we provide an EventListener called PgIdIndexer. This helper processes old events and populates the missing domain identifier column in the event store, ensuring they are indexed correctly.
let id_indexer = PgIdIndexer::<DomainEvent>::new("index_existing_id", pool);
PgEventListener::builder(event_store)
.register_listener(
id_indexer,
PgEventListenerConfig::poller(Duration::from_secs(10)).with_notifier()
)
.start_with_shutdown(shutdown())
.await?;
  1. Deleting an Existing Domain Identifier: Disintegrate does not automatically remove the domain identifier column. This deliberate design choice is made to support a blue-green rollout strategy.
warning

For cases 2 and 3, automation may be provided by the library in the future. Currently, users of the library need to manually make these changes in the database using SQL scripts.

Snapshots

If snapshotting is enabled, the library saves snapshots of stream queries in the snapshot table. Snapshots can be configured to store the result of a query at specified intervals, with the frequency determined by the number of events retrieved from the event store.

let decision_maker =
disintegrate_postgres::decision_maker_with_snapshot(event_store.clone(), 10).await?;

The library can automatically discard a snapshot under certain conditions:

  • Changes are made to the queries used to build it.
  • The library cannot deserialize the snapshot due to changes in the query state shape.
    • Addition of new fields to the state query.
    • Changes in the data type of existing fields.
warning

There may be situations where the output stays the same even though the computation underneath has changed. For example, a field of type i32 may still exist but its calculation method has been altered. In such cases, you'll need to manually delete the snapshot.

Using the Migrator Utility

Disintegrate provides the Migrator utility to simplify database initialization and migration. Users can call this utility instead of manually running SQL:

use disintegrate_postgres::Migrator;

let migrator = Migrator::new(event_store.clone());

// Initialize the event store database if not already done
migrator.init_event_store().await?;

// Initialize event listener tables
migrator.init_listener().await?;

// Run migrations
migrator.migrate_v2_1_0_to_v3_0_0().await?;
migrator.migrate_v3_x_x_to_v4_0_0().await?;

By using the Migrator, you can upgrade your event store database without manually writing SQL. The migrate_ methods (migrate_v2_1_0_to_v3_0_0, migrate_v3_x_x_to_v4_0_0, etc.) are intended to be called only once during the migration. Once the migration is complete and verified, these calls can be removed from your codebase.