Skip to main content

@idpass/data-collect-core / PostgresEventStorageAdapter

Class: PostgresEventStorageAdapter

Defined in: storage/PostgresEventStorageAdapter.ts:90

PostgreSQL implementation of the EventStorageAdapter for server-side event persistence.

This adapter provides scalable, tamper-evident event storage using PostgreSQL. It is designed for production server deployments requiring robust data persistence, cryptographic integrity verification, and efficient event sourcing operations.

Key features:

  • ACID Transactions: Full PostgreSQL transaction support for data consistency.
  • Multi-Tenant Support: Complete tenant isolation using tenant_id partitioning.
  • Immutable Event Storage: All events are stored as immutable records.
  • Audit Trail Management: Comprehensive audit logging for compliance and debugging.
  • Merkle Root Storage: Stores Merkle roots for cryptographic integrity verification of the event log.
  • Sync Timestamp Management: Tracks timestamps for various synchronization operations (local, remote, external).
  • Scalable Architecture: Designed for production workloads with proper indexing.

Architecture:

  • Uses PostgreSQL connection pooling for performance and scalability.
  • Stores events and audit logs as JSONB documents for flexible schema evolution.
  • Implements tenant isolation at the database level for all event-related data.
  • Provides optimized queries with proper indexing strategies for efficient retrieval.

Database Schema Overview:

  • events: Stores FormSubmission records with guid as primary key, entity_guid, timestamp, and sync_level.
  • audit_log: Stores AuditLogEntry records with id as primary key, entity_guid, event_guid, and timestamp.
  • merkle_root: Stores the latest Merkle root for event log integrity.
  • last_remote_sync_timestamp, last_local_sync_timestamp, last_push_external_sync_timestamp, last_pull_external_sync_timestamp: Tables to store the timestamps of various synchronization operations.

Examples

Basic server setup:

import { PostgresEventStorageAdapter } from '@idpass/idpass-data-collect';

const adapter = new PostgresEventStorageAdapter(
'postgresql://user:pass@localhost:5432/datacollect',
'tenant-123'
);

await adapter.initialize();

// Save events
const eventsToSave = [{ guid: 'event-1', entityGuid: 'entity-1', timestamp: new Date().toISOString(), type: 'create-entity', data: {} }];
await adapter.saveEvents(eventsToSave);

// Retrieve events
const allEvents = await adapter.getEvents();
console.log('All events:', allEvents);

Production connection configuration:

const adapter = new PostgresEventStorageAdapter(
'postgresql://event_user:secure_pass@db.example.com:5432/eventstore_prod?sslmode=require',
process.env.TENANT_ID
);

try {
await adapter.initialize();
console.log('Event storage initialized successfully');
} catch (error) {
console.error('Event storage initialization failed:', error);
process.exit(1);
}

Implements

Constructors

Constructor

new PostgresEventStorageAdapter(connectionString, tenantId?): PostgresEventStorageAdapter

Defined in: storage/PostgresEventStorageAdapter.ts:94

Parameters

connectionString

string

tenantId?

string

Returns

PostgresEventStorageAdapter

Methods

closeConnection()

closeConnection(): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:108

Closes all connections in the PostgreSQL connection pool.

Should be called during application shutdown to ensure graceful cleanup of database connections.

Returns

Promise<void>

A Promise that resolves when the connection is closed.

Implementation of

EventStorageAdapter.closeConnection


initialize()

initialize(): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:126

Initializes the PostgreSQL database with required tables and schemas for events, audit logs, and Merkle roots.

Creates:

  • events table with guid as primary key and various indexes for efficient querying.
  • audit_log table for storing audit trail entries.
  • merkle_root table for storing the latest Merkle root.
  • Timestamp tables for tracking different synchronization points.

This method is idempotent and safe to call multiple times.

Returns

Promise<void>

A Promise that resolves when the database is successfully initialized.

Throws

When database connection fails or table creation fails.

Implementation of

EventStorageAdapter.initialize


saveEvents()

saveEvents(events): Promise<string[]>

Defined in: storage/PostgresEventStorageAdapter.ts:212

Saves an array of FormSubmission events to the event store.

Events are saved within a transaction to ensure atomicity. If any event fails to save, the entire transaction is rolled back.

Parameters

events

FormSubmission[]

An array of FormSubmission objects to save.

Returns

Promise<string[]>

A Promise that resolves with an array of GUIDs of the successfully saved events.

Throws

If the database transaction fails during the save operation.

Implementation of

EventStorageAdapter.saveEvents


getEvents()

getEvents(): Promise<FormSubmission[]>

Defined in: storage/PostgresEventStorageAdapter.ts:252

Retrieves all FormSubmission events for the current tenant from the event store.

Events are mapped to ensure timestamp is a valid ISO string.

Returns

Promise<FormSubmission[]>

A Promise that resolves with an array of all FormSubmission events.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getEvents


saveAuditLog()

saveAuditLog(entries): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:286

Saves an array of AuditLogEntry entries to the audit log store.

Entries are saved within a transaction to ensure atomicity.

Parameters

entries

AuditLogEntry[]

An array of AuditLogEntry objects to save.

Returns

Promise<void>

A Promise that resolves when the audit log entries are successfully saved.

Throws

If the database transaction fails during the save operation.

Implementation of

EventStorageAdapter.saveAuditLog


getAuditLog()

getAuditLog(): Promise<AuditLogEntry[]>

Defined in: storage/PostgresEventStorageAdapter.ts:321

Retrieves all AuditLogEntry entries for the current tenant from the audit log store.

Returns

Promise<AuditLogEntry[]>

A Promise that resolves with an array of all AuditLogEntry entries.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getAuditLog


saveMerkleRoot()

saveMerkleRoot(root): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:352

Saves the Merkle root to the Merkle root store.

If a Merkle root already exists for the tenant, this operation will do nothing (ON CONFLICT DO NOTHING).

Parameters

root

string

The Merkle root string to save.

Returns

Promise<void>

A Promise that resolves when the Merkle root is successfully saved.

Throws

If the database operation fails.

Implementation of

EventStorageAdapter.saveMerkleRoot


getMerkleRoot()

getMerkleRoot(): Promise<string>

Defined in: storage/PostgresEventStorageAdapter.ts:375

Retrieves the latest stored Merkle root for the current tenant.

Returns

Promise<string>

A Promise that resolves with the Merkle root string, or an empty string if no root exists.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getMerkleRoot


updateEventSyncLevel()

updateEventSyncLevel(id, syncLevel): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:395

Updates the syncLevel for a specific event identified by its GUID.

Parameters

id

string

The GUID of the event to update.

syncLevel

SyncLevel

The new SyncLevel to set for the event.

Returns

Promise<void>

A Promise that resolves when the event's sync level is updated.

Throws

If the database update fails.

Implementation of

EventStorageAdapter.updateEventSyncLevel


updateAuditLogSyncLevel()

updateAuditLogSyncLevel(entityGuid, syncLevel): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:412

Updates the syncLevel for audit log entries associated with a given entity GUID.

Parameters

entityGuid

string

The GUID of the entity whose associated audit log entries' sync levels need to be updated.

syncLevel

SyncLevel

The new SyncLevel to set for the audit log entries.

Returns

Promise<void>

A Promise that resolves when the update is complete.

Throws

If the database update fails.

Implementation of

EventStorageAdapter.updateAuditLogSyncLevel


getEventsSince()

getEventsSince(timestamp): Promise<FormSubmission[]>

Defined in: storage/PostgresEventStorageAdapter.ts:430

Retrieves FormSubmission events that have occurred since a specified timestamp.

Events are ordered by timestamp in ascending order (oldest first).

Parameters

timestamp

The timestamp (ISO 8601 string or Date object) from which to retrieve events (exclusive).

string | Date

Returns

Promise<FormSubmission[]>

A Promise that resolves with an array of FormSubmission events.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getEventsSince


getEventsSincePagination()

getEventsSincePagination(timestamp, limit): Promise<{ events: FormSubmission[]; nextCursor: string | Date | null; }>

Defined in: storage/PostgresEventStorageAdapter.ts:463

Retrieves FormSubmission events that have occurred since a specified timestamp with pagination support.

Events are ordered by timestamp in ascending order (oldest first) and limited by limit.

Parameters

timestamp

The timestamp (ISO 8601 string or Date object) from which to retrieve events (exclusive).

string | Date

limit

number = 100

The maximum number of events to retrieve in a single page. Defaults to 100.

Returns

Promise<{ events: FormSubmission[]; nextCursor: string | Date | null; }>

A Promise that resolves with an object containing an array of FormSubmission events and the nextCursor for pagination.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getEventsSincePagination


getAuditLogsSince()

getAuditLogsSince(timestamp): Promise<AuditLogEntry[]>

Defined in: storage/PostgresEventStorageAdapter.ts:508

Retrieves AuditLogEntry entries that have occurred since a specified timestamp.

Audit logs are ordered by timestamp in descending order (newest first).

Parameters

timestamp

string

The ISO 8601 timestamp string from which to retrieve audit logs (exclusive).

Returns

Promise<AuditLogEntry[]>

A Promise that resolves with an array of AuditLogEntry entries.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getAuditLogsSince


updateSyncLevelFromEvents()

updateSyncLevelFromEvents(events): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:539

Updates the syncLevel for a batch of FormSubmission events based on their GUIDs.

This operation is performed within a transaction for atomicity.

Parameters

events

FormSubmission[]

An array of FormSubmission objects, each containing the GUID and the new syncLevel.

Returns

Promise<void>

A Promise that resolves when all specified events' sync levels are updated.

Throws

If the database transaction fails during the update operation.

Implementation of

EventStorageAdapter.updateSyncLevelFromEvents


getLastRemoteSyncTimestamp()

getLastRemoteSyncTimestamp(): Promise<string>

Defined in: storage/PostgresEventStorageAdapter.ts:565

Retrieves the timestamp of the last successful remote synchronization.

Returns

Promise<string>

A Promise that resolves with the timestamp string, or an empty string if no timestamp exists.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getLastRemoteSyncTimestamp


setLastRemoteSyncTimestamp()

setLastRemoteSyncTimestamp(timestamp): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:587

Sets the timestamp of the last successful remote synchronization.

This operation deletes any existing remote sync timestamp for the tenant and inserts the new one.

Parameters

timestamp

string

The timestamp string to save.

Returns

Promise<void>

A Promise that resolves when the timestamp is successfully saved.

Throws

If the database operation fails.

Implementation of

EventStorageAdapter.setLastRemoteSyncTimestamp


getLastLocalSyncTimestamp()

getLastLocalSyncTimestamp(): Promise<string>

Defined in: storage/PostgresEventStorageAdapter.ts:606

Retrieves the timestamp of the last successful local synchronization.

Returns

Promise<string>

A Promise that resolves with the timestamp string, or an empty string if no timestamp exists.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getLastLocalSyncTimestamp


setLastLocalSyncTimestamp()

setLastLocalSyncTimestamp(timestamp): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:628

Sets the timestamp of the last successful local synchronization.

This operation deletes any existing local sync timestamp for the tenant and inserts the new one.

Parameters

timestamp

string

The timestamp string to save.

Returns

Promise<void>

A Promise that resolves when the timestamp is successfully saved.

Throws

If the database operation fails.

Implementation of

EventStorageAdapter.setLastLocalSyncTimestamp


getLastPullExternalSyncTimestamp()

getLastPullExternalSyncTimestamp(): Promise<string>

Defined in: storage/PostgresEventStorageAdapter.ts:647

Retrieves the timestamp of the last successful external pull synchronization.

Returns

Promise<string>

A Promise that resolves with the timestamp string, or an empty string if no timestamp exists.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getLastPullExternalSyncTimestamp


setLastPullExternalSyncTimestamp()

setLastPullExternalSyncTimestamp(timestamp): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:669

Sets the timestamp of the last successful external pull synchronization.

This operation deletes any existing external pull sync timestamp for the tenant and inserts the new one.

Parameters

timestamp

string

The timestamp string to save.

Returns

Promise<void>

A Promise that resolves when the timestamp is successfully saved.

Throws

If the database operation fails.

Implementation of

EventStorageAdapter.setLastPullExternalSyncTimestamp


getLastPushExternalSyncTimestamp()

getLastPushExternalSyncTimestamp(): Promise<string>

Defined in: storage/PostgresEventStorageAdapter.ts:688

Retrieves the timestamp of the last successful external push synchronization.

Returns

Promise<string>

A Promise that resolves with the timestamp string, or an empty string if no timestamp exists.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getLastPushExternalSyncTimestamp


setLastPushExternalSyncTimestamp()

setLastPushExternalSyncTimestamp(timestamp): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:710

Sets the timestamp of the last successful external push synchronization.

This operation deletes any existing external push sync timestamp for the tenant and inserts the new one.

Parameters

timestamp

string

The timestamp string to save.

Returns

Promise<void>

A Promise that resolves when the timestamp is successfully saved.

Throws

If the database operation fails.

Implementation of

EventStorageAdapter.setLastPushExternalSyncTimestamp


isEventExisted()

isEventExisted(guid): Promise<boolean>

Defined in: storage/PostgresEventStorageAdapter.ts:730

Checks if an event with the given GUID exists in the event store for the current tenant.

Parameters

guid

string

The GUID of the event to check.

Returns

Promise<boolean>

A Promise that resolves to true if the event exists, false otherwise.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.isEventExisted


getAuditTrailByEntityGuid()

getAuditTrailByEntityGuid(entityGuid): Promise<AuditLogEntry[]>

Defined in: storage/PostgresEventStorageAdapter.ts:752

Retrieves the audit trail for a specific entity, identified by its entityGuid.

Audit log entries are ordered by timestamp in descending order (newest first).

Parameters

entityGuid

string

The GUID of the entity to retrieve the audit trail for.

Returns

Promise<AuditLogEntry[]>

A Promise that resolves with an array of AuditLogEntry entries.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getAuditTrailByEntityGuid


clearStore()

clearStore(): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:780

Clears all events, audit logs, Merkle roots, and sync timestamps for the current tenant from the store.

Returns

Promise<void>

A Promise that resolves when all data is cleared.

Throws

If the database deletion fails.

Implementation of

EventStorageAdapter.clearStore