@idpass/data-collect-core / PostgresEventStorageAdapter
Class: PostgresEventStorageAdapter
Defined in: storage/PostgresEventStorageAdapter.ts:90
PostgreSQL implementation of the EventStorageAdapter for server-side event persistence.
This adapter provides scalable, tamper-evident event storage using PostgreSQL. It is designed for production server deployments requiring robust data persistence, cryptographic integrity verification, and efficient event sourcing operations.
Key features:
- ACID Transactions: Full PostgreSQL transaction support for data consistency.
- Multi-Tenant Support: Complete tenant isolation using
tenant_idpartitioning. - Immutable Event Storage: All events are stored as immutable records.
- Audit Trail Management: Comprehensive audit logging for compliance and debugging.
- Merkle Root Storage: Stores Merkle roots for cryptographic integrity verification of the event log.
- Sync Timestamp Management: Tracks timestamps for various synchronization operations (local, remote, external).
- Scalable Architecture: Designed for production workloads with proper indexing.
Architecture:
- Uses PostgreSQL connection pooling for performance and scalability.
- Stores events and audit logs as JSONB documents for flexible schema evolution.
- Implements tenant isolation at the database level for all event-related data.
- Provides optimized queries with proper indexing strategies for efficient retrieval.
Database Schema Overview:
events: StoresFormSubmissionrecords withguidas primary key,entity_guid,timestamp, andsync_level.audit_log: StoresAuditLogEntryrecords withidas primary key,entity_guid,event_guid, andtimestamp.merkle_root: Stores the latest Merkle root for event log integrity.last_remote_sync_timestamp,last_local_sync_timestamp,last_push_external_sync_timestamp,last_pull_external_sync_timestamp: Tables to store the timestamps of various synchronization operations.
Examples
Basic server setup:
import { PostgresEventStorageAdapter } from '@idpass/idpass-data-collect';
const adapter = new PostgresEventStorageAdapter(
'postgresql://user:pass@localhost:5432/datacollect',
'tenant-123'
);
await adapter.initialize();
// Save events
const eventsToSave = [{ guid: 'event-1', entityGuid: 'entity-1', timestamp: new Date().toISOString(), type: 'create-entity', data: {} }];
await adapter.saveEvents(eventsToSave);
// Retrieve events
const allEvents = await adapter.getEvents();
console.log('All events:', allEvents);
Production connection configuration:
const adapter = new PostgresEventStorageAdapter(
'postgresql://event_user:secure_pass@db.example.com:5432/eventstore_prod?sslmode=require',
process.env.TENANT_ID
);
try {
await adapter.initialize();
console.log('Event storage initialized successfully');
} catch (error) {
console.error('Event storage initialization failed:', error);
process.exit(1);
}
Implements
Constructors
Constructor
new PostgresEventStorageAdapter(
connectionString,tenantId?):PostgresEventStorageAdapter
Defined in: storage/PostgresEventStorageAdapter.ts:94
Parameters
connectionString
string
tenantId?
string
Returns
PostgresEventStorageAdapter
Methods
closeConnection()
closeConnection():
Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:108
Closes all connections in the PostgreSQL connection pool.
Should be called during application shutdown to ensure graceful cleanup of database connections.
Returns
Promise<void>
A Promise that resolves when the connection is closed.
Implementation of
EventStorageAdapter.closeConnection
initialize()
initialize():
Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:126
Initializes the PostgreSQL database with required tables and schemas for events, audit logs, and Merkle roots.
Creates:
eventstable withguidas primary key and various indexes for efficient querying.audit_logtable for storing audit trail entries.merkle_roottable for storing the latest Merkle root.- Timestamp tables for tracking different synchronization points.
This method is idempotent and safe to call multiple times.
Returns
Promise<void>
A Promise that resolves when the database is successfully initialized.
Throws
When database connection fails or table creation fails.
Implementation of
EventStorageAdapter.initialize
saveEvents()
saveEvents(
events):Promise<string[]>
Defined in: storage/PostgresEventStorageAdapter.ts:212
Saves an array of FormSubmission events to the event store.
Events are saved within a transaction to ensure atomicity. If any event fails to save, the entire transaction is rolled back.
Parameters
events
An array of FormSubmission objects to save.
Returns
Promise<string[]>
A Promise that resolves with an array of GUIDs of the successfully saved events.
Throws
If the database transaction fails during the save operation.
Implementation of
EventStorageAdapter.saveEvents
getEvents()
getEvents():
Promise<FormSubmission[]>
Defined in: storage/PostgresEventStorageAdapter.ts:252
Retrieves all FormSubmission events for the current tenant from the event store.
Events are mapped to ensure timestamp is a valid ISO string.
Returns
Promise<FormSubmission[]>
A Promise that resolves with an array of all FormSubmission events.
Throws
If the database query fails.
Implementation of
saveAuditLog()
saveAuditLog(
entries):Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:286
Saves an array of AuditLogEntry entries to the audit log store.
Entries are saved within a transaction to ensure atomicity.
Parameters
entries
An array of AuditLogEntry objects to save.
Returns
Promise<void>
A Promise that resolves when the audit log entries are successfully saved.
Throws
If the database transaction fails during the save operation.
Implementation of
EventStorageAdapter.saveAuditLog
getAuditLog()
getAuditLog():
Promise<AuditLogEntry[]>
Defined in: storage/PostgresEventStorageAdapter.ts:321
Retrieves all AuditLogEntry entries for the current tenant from the audit log store.
Returns
Promise<AuditLogEntry[]>
A Promise that resolves with an array of all AuditLogEntry entries.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getAuditLog
saveMerkleRoot()
saveMerkleRoot(
root):Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:352
Saves the Merkle root to the Merkle root store.
If a Merkle root already exists for the tenant, this operation will do nothing (ON CONFLICT DO NOTHING).
Parameters
root
string
The Merkle root string to save.
Returns
Promise<void>
A Promise that resolves when the Merkle root is successfully saved.
Throws
If the database operation fails.
Implementation of
EventStorageAdapter.saveMerkleRoot
getMerkleRoot()
getMerkleRoot():
Promise<string>
Defined in: storage/PostgresEventStorageAdapter.ts:375
Retrieves the latest stored Merkle root for the current tenant.
Returns
Promise<string>
A Promise that resolves with the Merkle root string, or an empty string if no root exists.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getMerkleRoot
updateEventSyncLevel()
updateEventSyncLevel(
id,syncLevel):Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:395
Updates the syncLevel for a specific event identified by its GUID.
Parameters
id
string
The GUID of the event to update.
syncLevel
The new SyncLevel to set for the event.
Returns
Promise<void>
A Promise that resolves when the event's sync level is updated.
Throws
If the database update fails.
Implementation of
EventStorageAdapter.updateEventSyncLevel
updateAuditLogSyncLevel()
updateAuditLogSyncLevel(
entityGuid,syncLevel):Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:412
Updates the syncLevel for audit log entries associated with a given entity GUID.
Parameters
entityGuid
string
The GUID of the entity whose associated audit log entries' sync levels need to be updated.
syncLevel
The new SyncLevel to set for the audit log entries.
Returns
Promise<void>
A Promise that resolves when the update is complete.
Throws
If the database update fails.
Implementation of
EventStorageAdapter.updateAuditLogSyncLevel
getEventsSince()
getEventsSince(
timestamp):Promise<FormSubmission[]>
Defined in: storage/PostgresEventStorageAdapter.ts:430
Retrieves FormSubmission events that have occurred since a specified timestamp.
Events are ordered by timestamp in ascending order (oldest first).
Parameters
timestamp
The timestamp (ISO 8601 string or Date object) from which to retrieve events (exclusive).
string | Date
Returns
Promise<FormSubmission[]>
A Promise that resolves with an array of FormSubmission events.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getEventsSince
getEventsSincePagination()
getEventsSincePagination(
timestamp,limit):Promise<{events:FormSubmission[];nextCursor:string|Date|null; }>
Defined in: storage/PostgresEventStorageAdapter.ts:463
Retrieves FormSubmission events that have occurred since a specified timestamp with pagination support.
Events are ordered by timestamp in ascending order (oldest first) and limited by limit.
Parameters
timestamp
The timestamp (ISO 8601 string or Date object) from which to retrieve events (exclusive).
string | Date
limit
number = 100
The maximum number of events to retrieve in a single page. Defaults to 100.
Returns
Promise<{ events: FormSubmission[]; nextCursor: string | Date | null; }>
A Promise that resolves with an object containing an array of FormSubmission events and the nextCursor for pagination.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getEventsSincePagination
getAuditLogsSince()
getAuditLogsSince(
timestamp):Promise<AuditLogEntry[]>
Defined in: storage/PostgresEventStorageAdapter.ts:508
Retrieves AuditLogEntry entries that have occurred since a specified timestamp.
Audit logs are ordered by timestamp in descending order (newest first).
Parameters
timestamp
string
The ISO 8601 timestamp string from which to retrieve audit logs (exclusive).
Returns
Promise<AuditLogEntry[]>
A Promise that resolves with an array of AuditLogEntry entries.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getAuditLogsSince
updateSyncLevelFromEvents()
updateSyncLevelFromEvents(
events):Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:539
Updates the syncLevel for a batch of FormSubmission events based on their GUIDs.
This operation is performed within a transaction for atomicity.
Parameters
events
An array of FormSubmission objects, each containing the GUID and the new syncLevel.
Returns
Promise<void>
A Promise that resolves when all specified events' sync levels are updated.
Throws
If the database transaction fails during the update operation.
Implementation of
EventStorageAdapter.updateSyncLevelFromEvents
getLastRemoteSyncTimestamp()
getLastRemoteSyncTimestamp():
Promise<string>
Defined in: storage/PostgresEventStorageAdapter.ts:565
Retrieves the timestamp of the last successful remote synchronization.
Returns
Promise<string>
A Promise that resolves with the timestamp string, or an empty string if no timestamp exists.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getLastRemoteSyncTimestamp
setLastRemoteSyncTimestamp()
setLastRemoteSyncTimestamp(
timestamp):Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:587
Sets the timestamp of the last successful remote synchronization.
This operation deletes any existing remote sync timestamp for the tenant and inserts the new one.
Parameters
timestamp
string
The timestamp string to save.
Returns
Promise<void>
A Promise that resolves when the timestamp is successfully saved.
Throws
If the database operation fails.
Implementation of
EventStorageAdapter.setLastRemoteSyncTimestamp
getLastLocalSyncTimestamp()
getLastLocalSyncTimestamp():
Promise<string>
Defined in: storage/PostgresEventStorageAdapter.ts:606
Retrieves the timestamp of the last successful local synchronization.
Returns
Promise<string>
A Promise that resolves with the timestamp string, or an empty string if no timestamp exists.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getLastLocalSyncTimestamp
setLastLocalSyncTimestamp()
setLastLocalSyncTimestamp(
timestamp):Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:628
Sets the timestamp of the last successful local synchronization.
This operation deletes any existing local sync timestamp for the tenant and inserts the new one.
Parameters
timestamp
string
The timestamp string to save.
Returns
Promise<void>
A Promise that resolves when the timestamp is successfully saved.
Throws
If the database operation fails.
Implementation of
EventStorageAdapter.setLastLocalSyncTimestamp
getLastPullExternalSyncTimestamp()
getLastPullExternalSyncTimestamp():
Promise<string>
Defined in: storage/PostgresEventStorageAdapter.ts:647
Retrieves the timestamp of the last successful external pull synchronization.
Returns
Promise<string>
A Promise that resolves with the timestamp string, or an empty string if no timestamp exists.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getLastPullExternalSyncTimestamp
setLastPullExternalSyncTimestamp()
setLastPullExternalSyncTimestamp(
timestamp):Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:669
Sets the timestamp of the last successful external pull synchronization.
This operation deletes any existing external pull sync timestamp for the tenant and inserts the new one.
Parameters
timestamp
string
The timestamp string to save.
Returns
Promise<void>
A Promise that resolves when the timestamp is successfully saved.
Throws
If the database operation fails.
Implementation of
EventStorageAdapter.setLastPullExternalSyncTimestamp
getLastPushExternalSyncTimestamp()
getLastPushExternalSyncTimestamp():
Promise<string>
Defined in: storage/PostgresEventStorageAdapter.ts:688
Retrieves the timestamp of the last successful external push synchronization.
Returns
Promise<string>
A Promise that resolves with the timestamp string, or an empty string if no timestamp exists.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getLastPushExternalSyncTimestamp
setLastPushExternalSyncTimestamp()
setLastPushExternalSyncTimestamp(
timestamp):Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:710
Sets the timestamp of the last successful external push synchronization.
This operation deletes any existing external push sync timestamp for the tenant and inserts the new one.
Parameters
timestamp
string
The timestamp string to save.
Returns
Promise<void>
A Promise that resolves when the timestamp is successfully saved.
Throws
If the database operation fails.
Implementation of
EventStorageAdapter.setLastPushExternalSyncTimestamp
isEventExisted()
isEventExisted(
guid):Promise<boolean>
Defined in: storage/PostgresEventStorageAdapter.ts:730
Checks if an event with the given GUID exists in the event store for the current tenant.
Parameters
guid
string
The GUID of the event to check.
Returns
Promise<boolean>
A Promise that resolves to true if the event exists, false otherwise.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.isEventExisted
getAuditTrailByEntityGuid()
getAuditTrailByEntityGuid(
entityGuid):Promise<AuditLogEntry[]>
Defined in: storage/PostgresEventStorageAdapter.ts:752
Retrieves the audit trail for a specific entity, identified by its entityGuid.
Audit log entries are ordered by timestamp in descending order (newest first).
Parameters
entityGuid
string
The GUID of the entity to retrieve the audit trail for.
Returns
Promise<AuditLogEntry[]>
A Promise that resolves with an array of AuditLogEntry entries.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getAuditTrailByEntityGuid
clearStore()
clearStore():
Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:780
Clears all events, audit logs, Merkle roots, and sync timestamps for the current tenant from the store.
Returns
Promise<void>
A Promise that resolves when all data is cleared.
Throws
If the database deletion fails.