Skip to main content

@idpass/data-collect-core / PostgresEventStorageAdapter

Class: PostgresEventStorageAdapter

Defined in: storage/PostgresEventStorageAdapter.ts:93

PostgreSQL implementation of the EventStorageAdapter for server-side event persistence.

This adapter provides scalable, tamper-evident event storage using PostgreSQL. It is designed for production server deployments requiring robust data persistence and efficient event sourcing operations.

Key features:

  • ACID Transactions: Full PostgreSQL transaction support for data consistency.
  • Multi-Tenant Support: Complete tenant isolation using tenant_id partitioning.
  • Immutable Event Storage: All events are stored as immutable records.
  • Audit Trail Management: Comprehensive audit logging for compliance and debugging.
  • Sync Timestamp Management: Tracks timestamps for various synchronization operations (local, remote, external).
  • Scalable Architecture: Designed for production workloads with proper indexing.

Architecture:

  • Uses PostgreSQL connection pooling for performance and scalability.
  • Stores events and audit logs as JSONB documents for flexible schema evolution.
  • Implements tenant isolation at the database level for all event-related data.
  • Provides optimized queries with proper indexing strategies for efficient retrieval.

Database Schema Overview:

  • events: Stores FormSubmission records with guid as primary key, entity_guid, timestamp, and sync_level.
  • audit_log: Stores AuditLogEntry records with id as primary key, entity_guid, event_guid, and timestamp.
  • sync_metadata: Key-value table for storing sync timestamps and other metadata, keyed by (tenant_id, key).

Examples

Basic server setup:

import { PostgresEventStorageAdapter } from '@idpass/data-collect-core';

const adapter = new PostgresEventStorageAdapter(
'postgresql://user:pass@localhost:5432/datacollect',
'tenant-123'
);

await adapter.initialize();

// Save events
const eventsToSave = [{ guid: 'event-1', entityGuid: 'entity-1', timestamp: new Date().toISOString(), type: 'create-entity', data: {} }];
await adapter.saveEvents(eventsToSave);

// Retrieve events
const allEvents = await adapter.getEvents();
console.log('All events:', allEvents);

Production connection configuration:

const adapter = new PostgresEventStorageAdapter(
'postgresql://event_user:secure_pass@db.example.com:5432/eventstore_prod?sslmode=require',
process.env.TENANT_ID
);

try {
await adapter.initialize();
console.log('Event storage initialized successfully');
} catch (error) {
console.error('Event storage initialization failed:', error);
process.exit(1);
}

Implements

Constructors

Constructor

new PostgresEventStorageAdapter(connectionString, tenantId?): PostgresEventStorageAdapter

Defined in: storage/PostgresEventStorageAdapter.ts:99

Parameters

connectionString

string

tenantId?

string

Returns

PostgresEventStorageAdapter

Constructor

new PostgresEventStorageAdapter(pool, tenantId?): PostgresEventStorageAdapter

Defined in: storage/PostgresEventStorageAdapter.ts:104

Creates a PostgresEventStorageAdapter using an existing Pool. When constructed this way, the adapter does NOT own the pool and will not close it.

Parameters

pool

Pool

tenantId?

string

Returns

PostgresEventStorageAdapter

Methods

getPool()

getPool(): Pool

Defined in: storage/PostgresEventStorageAdapter.ts:121

Returns the underlying pg Pool used by this adapter. Useful for creating transactional EDM stacks that share a single connection pool.

Returns

Pool


setDrizzleInstance()

setDrizzleInstance(db): void

Defined in: storage/PostgresEventStorageAdapter.ts:132

Replaces the internal Drizzle database instance. Used to inject a Drizzle transaction object so that all operations on this adapter participate in an external transaction.

Parameters

db

DrizzleDatabase

A Drizzle database or transaction instance.

Returns

void


closeConnection()

closeConnection(): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:143

Closes all connections in the PostgreSQL connection pool. Only closes the pool if this adapter owns it (created from a connection string). When constructed with an external Pool, the caller is responsible for pool lifecycle.

Returns

Promise<void>

A Promise that resolves when the connection is closed.

Implementation of

EventStorageAdapter.closeConnection


initialize()

initialize(): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:162

Initializes the PostgreSQL database with required tables and schemas for events and audit logs.

Creates:

  • events table with guid as primary key and various indexes for efficient querying.
  • audit_log table for storing audit trail entries.
  • sync_metadata table for tracking different synchronization timestamps.

This method is idempotent and safe to call multiple times.

Returns

Promise<void>

A Promise that resolves when the database is successfully initialized.

Throws

When database connection fails or table creation fails.

Implementation of

EventStorageAdapter.initialize


saveEvents()

saveEvents(eventList): Promise<string[]>

Defined in: storage/PostgresEventStorageAdapter.ts:232

Saves an array of FormSubmission events to the event store.

Events are saved within a transaction to ensure atomicity. If any event fails to save, the entire transaction is rolled back.

Parameters

eventList

FormSubmission[]

An array of FormSubmission objects to save.

Returns

Promise<string[]>

A Promise that resolves with an array of GUIDs of the successfully saved events.

Throws

If the database transaction fails during the save operation.

Implementation of

EventStorageAdapter.saveEvents


getEvents()

getEvents(): Promise<FormSubmission[]>

Defined in: storage/PostgresEventStorageAdapter.ts:266

Retrieves all FormSubmission events for the current tenant from the event store.

Events are mapped to ensure timestamp is a valid ISO string.

Returns

Promise<FormSubmission[]>

A Promise that resolves with an array of all FormSubmission events.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getEvents


saveAuditLog()

saveAuditLog(entries): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:306

Saves an array of AuditLogEntry entries to the audit log store.

Entries are saved within a transaction to ensure atomicity.

Parameters

entries

AuditLogEntry[]

An array of AuditLogEntry objects to save.

Returns

Promise<void>

A Promise that resolves when the audit log entries are successfully saved.

Throws

If the database transaction fails during the save operation.

Implementation of

EventStorageAdapter.saveAuditLog


getAuditLog()

getAuditLog(): Promise<AuditLogEntry[]>

Defined in: storage/PostgresEventStorageAdapter.ts:330

Retrieves all AuditLogEntry entries for the current tenant from the audit log store.

Returns

Promise<AuditLogEntry[]>

A Promise that resolves with an array of all AuditLogEntry entries.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getAuditLog


updateEventSyncLevel()

updateEventSyncLevel(id, syncLevel): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:365

Updates the syncLevel for a specific event identified by its GUID.

Parameters

id

string

The GUID of the event to update.

syncLevel

SyncLevel

The new SyncLevel to set for the event.

Returns

Promise<void>

A Promise that resolves when the event's sync level is updated.

Throws

If the database update fails.

Implementation of

EventStorageAdapter.updateEventSyncLevel


updateAuditLogSyncLevel()

updateAuditLogSyncLevel(entityGuid, syncLevel): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:377

Updates the syncLevel for audit log entries associated with a given entity GUID.

Parameters

entityGuid

string

The GUID of the entity whose associated audit log entries' sync levels need to be updated.

syncLevel

SyncLevel

The new SyncLevel to set for the audit log entries.

Returns

Promise<void>

A Promise that resolves when the update is complete.

Throws

If the database update fails.

Implementation of

EventStorageAdapter.updateAuditLogSyncLevel


getEventsSince()

getEventsSince(timestamp): Promise<FormSubmission[]>

Defined in: storage/PostgresEventStorageAdapter.ts:394

Retrieves FormSubmission events that have occurred since a specified timestamp.

Events are ordered by timestamp in ascending order (oldest first).

Parameters

timestamp

The timestamp (ISO 8601 string or Date object) from which to retrieve events (exclusive).

string | Date

Returns

Promise<FormSubmission[]>

A Promise that resolves with an array of FormSubmission events.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getEventsSince


getEventsSincePagination()

getEventsSincePagination(timestamp, limit?): Promise<{ events: FormSubmission[]; nextCursor: string | Date | null; }>

Defined in: storage/PostgresEventStorageAdapter.ts:435

Retrieves FormSubmission events that have occurred since a specified timestamp with pagination support.

Events are ordered by timestamp in ascending order (oldest first) and limited by limit.

Parameters

timestamp

The timestamp (ISO 8601 string or Date object) from which to retrieve events (exclusive).

string | Date

limit?

number = 100

The maximum number of events to retrieve in a single page. Defaults to 100.

Returns

Promise<{ events: FormSubmission[]; nextCursor: string | Date | null; }>

A Promise that resolves with an object containing an array of FormSubmission events and the nextCursor for pagination.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getEventsSincePagination


getAuditLogsSince()

getAuditLogsSince(timestamp): Promise<AuditLogEntry[]>

Defined in: storage/PostgresEventStorageAdapter.ts:502

Retrieves AuditLogEntry entries that have occurred since a specified timestamp.

Audit logs are ordered by timestamp in descending order (newest first).

Parameters

timestamp

string

The ISO 8601 timestamp string from which to retrieve audit logs (exclusive).

Returns

Promise<AuditLogEntry[]>

A Promise that resolves with an array of AuditLogEntry entries.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getAuditLogsSince


updateSyncLevelFromEvents()

updateSyncLevelFromEvents(eventList): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:538

Updates the syncLevel for a batch of FormSubmission events based on their GUIDs.

This operation is performed within a transaction for atomicity.

Parameters

eventList

FormSubmission[]

An array of FormSubmission objects, each containing the GUID and the new syncLevel.

Returns

Promise<void>

A Promise that resolves when all specified events' sync levels are updated.

Throws

If the database transaction fails during the update operation.

Implementation of

EventStorageAdapter.updateSyncLevelFromEvents


getLastRemoteSyncTimestamp()

getLastRemoteSyncTimestamp(): Promise<string>

Defined in: storage/PostgresEventStorageAdapter.ts:555

Retrieves the timestamp of the last successful remote synchronization.

Returns

Promise<string>

A Promise that resolves with the timestamp string, or an empty string if no timestamp exists.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getLastRemoteSyncTimestamp


setLastRemoteSyncTimestamp()

setLastRemoteSyncTimestamp(timestamp): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:568

Sets the timestamp of the last successful remote synchronization.

Uses an UPSERT to insert or update the timestamp in the sync_metadata table.

Parameters

timestamp

string

The timestamp string to save.

Returns

Promise<void>

A Promise that resolves when the timestamp is successfully saved.

Throws

If the database operation fails.

Implementation of

EventStorageAdapter.setLastRemoteSyncTimestamp


getLastLocalSyncTimestamp()

getLastLocalSyncTimestamp(): Promise<string>

Defined in: storage/PostgresEventStorageAdapter.ts:578

Retrieves the timestamp of the last successful local synchronization.

Returns

Promise<string>

A Promise that resolves with the timestamp string, or an empty string if no timestamp exists.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getLastLocalSyncTimestamp


setLastLocalSyncTimestamp()

setLastLocalSyncTimestamp(timestamp): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:591

Sets the timestamp of the last successful local synchronization.

Uses an UPSERT to insert or update the timestamp in the sync_metadata table.

Parameters

timestamp

string

The timestamp string to save.

Returns

Promise<void>

A Promise that resolves when the timestamp is successfully saved.

Throws

If the database operation fails.

Implementation of

EventStorageAdapter.setLastLocalSyncTimestamp


getLastPullExternalSyncTimestamp()

getLastPullExternalSyncTimestamp(): Promise<string>

Defined in: storage/PostgresEventStorageAdapter.ts:601

Retrieves the timestamp of the last successful external pull synchronization.

Returns

Promise<string>

A Promise that resolves with the timestamp string, or an empty string if no timestamp exists.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getLastPullExternalSyncTimestamp


setLastPullExternalSyncTimestamp()

setLastPullExternalSyncTimestamp(timestamp): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:614

Sets the timestamp of the last successful external pull synchronization.

Uses an UPSERT to insert or update the timestamp in the sync_metadata table.

Parameters

timestamp

string

The timestamp string to save.

Returns

Promise<void>

A Promise that resolves when the timestamp is successfully saved.

Throws

If the database operation fails.

Implementation of

EventStorageAdapter.setLastPullExternalSyncTimestamp


getLastPushExternalSyncTimestamp()

getLastPushExternalSyncTimestamp(): Promise<string>

Defined in: storage/PostgresEventStorageAdapter.ts:624

Retrieves the timestamp of the last successful external push synchronization.

Returns

Promise<string>

A Promise that resolves with the timestamp string, or an empty string if no timestamp exists.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getLastPushExternalSyncTimestamp


setLastPushExternalSyncTimestamp()

setLastPushExternalSyncTimestamp(timestamp): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:637

Sets the timestamp of the last successful external push synchronization.

Uses an UPSERT to insert or update the timestamp in the sync_metadata table.

Parameters

timestamp

string

The timestamp string to save.

Returns

Promise<void>

A Promise that resolves when the timestamp is successfully saved.

Throws

If the database operation fails.

Implementation of

EventStorageAdapter.setLastPushExternalSyncTimestamp


isEventExisted()

isEventExisted(guid): Promise<boolean>

Defined in: storage/PostgresEventStorageAdapter.ts:648

Checks if an event with the given GUID exists in the event store for the current tenant.

Parameters

guid

string

The GUID of the event to check.

Returns

Promise<boolean>

A Promise that resolves to true if the event exists, false otherwise.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.isEventExisted


getAuditTrailByEntityGuid()

getAuditTrailByEntityGuid(entityGuid): Promise<AuditLogEntry[]>

Defined in: storage/PostgresEventStorageAdapter.ts:666

Retrieves the audit trail for a specific entity, identified by its entityGuid.

Audit log entries are ordered by timestamp in descending order (newest first).

Parameters

entityGuid

string

The GUID of the entity to retrieve the audit trail for.

Returns

Promise<AuditLogEntry[]>

A Promise that resolves with an array of AuditLogEntry entries.

Throws

If the database query fails.

Implementation of

EventStorageAdapter.getAuditTrailByEntityGuid


clearStore()

clearStore(): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:700

Clears all events, audit logs, and sync metadata for the current tenant from the store.

Returns

Promise<void>

A Promise that resolves when all data is cleared.

Throws

If the database deletion fails.

Implementation of

EventStorageAdapter.clearStore


persistHashAnchor()

persistHashAnchor(hash): Promise<void>

Defined in: storage/PostgresEventStorageAdapter.ts:712

Persists the latest hash anchor for tamper detection on restart.

Parameters

hash

string

The hash string to persist.

Returns

Promise<void>

A Promise that resolves when the hash is persisted.

Implementation of

EventStorageAdapter.persistHashAnchor


getPersistedHashAnchor()

getPersistedHashAnchor(): Promise<string | null>

Defined in: storage/PostgresEventStorageAdapter.ts:721

Retrieves the previously persisted hash anchor, or null if none exists.

Returns

Promise<string | null>

The persisted hash string, or null if no anchor has been saved.

Implementation of

EventStorageAdapter.getPersistedHashAnchor