@idpass/data-collect-core / PostgresEventStorageAdapter
Class: PostgresEventStorageAdapter
Defined in: storage/PostgresEventStorageAdapter.ts:93
PostgreSQL implementation of the EventStorageAdapter for server-side event persistence.
This adapter provides scalable, tamper-evident event storage using PostgreSQL. It is designed for production server deployments requiring robust data persistence and efficient event sourcing operations.
Key features:
- ACID Transactions: Full PostgreSQL transaction support for data consistency.
- Multi-Tenant Support: Complete tenant isolation using
tenant_idpartitioning. - Immutable Event Storage: All events are stored as immutable records.
- Audit Trail Management: Comprehensive audit logging for compliance and debugging.
- Sync Timestamp Management: Tracks timestamps for various synchronization operations (local, remote, external).
- Scalable Architecture: Designed for production workloads with proper indexing.
Architecture:
- Uses PostgreSQL connection pooling for performance and scalability.
- Stores events and audit logs as JSONB documents for flexible schema evolution.
- Implements tenant isolation at the database level for all event-related data.
- Provides optimized queries with proper indexing strategies for efficient retrieval.
Database Schema Overview:
events: StoresFormSubmissionrecords withguidas primary key,entity_guid,timestamp, andsync_level.audit_log: StoresAuditLogEntryrecords withidas primary key,entity_guid,event_guid, andtimestamp.sync_metadata: Key-value table for storing sync timestamps and other metadata, keyed by(tenant_id, key).
Examples
Basic server setup:
import { PostgresEventStorageAdapter } from '@idpass/data-collect-core';
const adapter = new PostgresEventStorageAdapter(
'postgresql://user:pass@localhost:5432/datacollect',
'tenant-123'
);
await adapter.initialize();
// Save events
const eventsToSave = [{ guid: 'event-1', entityGuid: 'entity-1', timestamp: new Date().toISOString(), type: 'create-entity', data: {} }];
await adapter.saveEvents(eventsToSave);
// Retrieve events
const allEvents = await adapter.getEvents();
console.log('All events:', allEvents);
Production connection configuration:
const adapter = new PostgresEventStorageAdapter(
'postgresql://event_user:secure_pass@db.example.com:5432/eventstore_prod?sslmode=require',
process.env.TENANT_ID
);
try {
await adapter.initialize();
console.log('Event storage initialized successfully');
} catch (error) {
console.error('Event storage initialization failed:', error);
process.exit(1);
}
Implements
Constructors
Constructor
new PostgresEventStorageAdapter(
connectionString,tenantId?):PostgresEventStorageAdapter
Defined in: storage/PostgresEventStorageAdapter.ts:99
Parameters
connectionString
string
tenantId?
string
Returns
PostgresEventStorageAdapter
Constructor
new PostgresEventStorageAdapter(
pool,tenantId?):PostgresEventStorageAdapter
Defined in: storage/PostgresEventStorageAdapter.ts:104
Creates a PostgresEventStorageAdapter using an existing Pool. When constructed this way, the adapter does NOT own the pool and will not close it.
Parameters
pool
Pool
tenantId?
string
Returns
PostgresEventStorageAdapter
Methods
getPool()
getPool():
Pool
Defined in: storage/PostgresEventStorageAdapter.ts:121
Returns the underlying pg Pool used by this adapter. Useful for creating transactional EDM stacks that share a single connection pool.
Returns
Pool
setDrizzleInstance()
setDrizzleInstance(
db):void
Defined in: storage/PostgresEventStorageAdapter.ts:132
Replaces the internal Drizzle database instance. Used to inject a Drizzle transaction object so that all operations on this adapter participate in an external transaction.
Parameters
db
A Drizzle database or transaction instance.
Returns
void
closeConnection()
closeConnection():
Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:143
Closes all connections in the PostgreSQL connection pool. Only closes the pool if this adapter owns it (created from a connection string). When constructed with an external Pool, the caller is responsible for pool lifecycle.
Returns
Promise<void>
A Promise that resolves when the connection is closed.
Implementation of
EventStorageAdapter.closeConnection
initialize()
initialize():
Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:162
Initializes the PostgreSQL database with required tables and schemas for events and audit logs.
Creates:
eventstable withguidas primary key and various indexes for efficient querying.audit_logtable for storing audit trail entries.sync_metadatatable for tracking different synchronization timestamps.
This method is idempotent and safe to call multiple times.
Returns
Promise<void>
A Promise that resolves when the database is successfully initialized.
Throws
When database connection fails or table creation fails.
Implementation of
EventStorageAdapter.initialize
saveEvents()
saveEvents(
eventList):Promise<string[]>
Defined in: storage/PostgresEventStorageAdapter.ts:232
Saves an array of FormSubmission events to the event store.
Events are saved within a transaction to ensure atomicity. If any event fails to save, the entire transaction is rolled back.
Parameters
eventList
An array of FormSubmission objects to save.
Returns
Promise<string[]>
A Promise that resolves with an array of GUIDs of the successfully saved events.
Throws
If the database transaction fails during the save operation.
Implementation of
EventStorageAdapter.saveEvents
getEvents()
getEvents():
Promise<FormSubmission[]>
Defined in: storage/PostgresEventStorageAdapter.ts:266
Retrieves all FormSubmission events for the current tenant from the event store.
Events are mapped to ensure timestamp is a valid ISO string.
Returns
Promise<FormSubmission[]>
A Promise that resolves with an array of all FormSubmission events.
Throws
If the database query fails.
Implementation of
saveAuditLog()
saveAuditLog(
entries):Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:306
Saves an array of AuditLogEntry entries to the audit log store.
Entries are saved within a transaction to ensure atomicity.
Parameters
entries
An array of AuditLogEntry objects to save.
Returns
Promise<void>
A Promise that resolves when the audit log entries are successfully saved.
Throws
If the database transaction fails during the save operation.
Implementation of
EventStorageAdapter.saveAuditLog
getAuditLog()
getAuditLog():
Promise<AuditLogEntry[]>
Defined in: storage/PostgresEventStorageAdapter.ts:330
Retrieves all AuditLogEntry entries for the current tenant from the audit log store.
Returns
Promise<AuditLogEntry[]>
A Promise that resolves with an array of all AuditLogEntry entries.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getAuditLog
updateEventSyncLevel()
updateEventSyncLevel(
id,syncLevel):Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:365
Updates the syncLevel for a specific event identified by its GUID.
Parameters
id
string
The GUID of the event to update.
syncLevel
The new SyncLevel to set for the event.
Returns
Promise<void>
A Promise that resolves when the event's sync level is updated.
Throws
If the database update fails.
Implementation of
EventStorageAdapter.updateEventSyncLevel
updateAuditLogSyncLevel()
updateAuditLogSyncLevel(
entityGuid,syncLevel):Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:377
Updates the syncLevel for audit log entries associated with a given entity GUID.
Parameters
entityGuid
string
The GUID of the entity whose associated audit log entries' sync levels need to be updated.
syncLevel
The new SyncLevel to set for the audit log entries.
Returns
Promise<void>
A Promise that resolves when the update is complete.
Throws
If the database update fails.
Implementation of
EventStorageAdapter.updateAuditLogSyncLevel
getEventsSince()
getEventsSince(
timestamp):Promise<FormSubmission[]>
Defined in: storage/PostgresEventStorageAdapter.ts:394
Retrieves FormSubmission events that have occurred since a specified timestamp.
Events are ordered by timestamp in ascending order (oldest first).
Parameters
timestamp
The timestamp (ISO 8601 string or Date object) from which to retrieve events (exclusive).
string | Date
Returns
Promise<FormSubmission[]>
A Promise that resolves with an array of FormSubmission events.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getEventsSince
getEventsSincePagination()
getEventsSincePagination(
timestamp,limit?):Promise<{events:FormSubmission[];nextCursor:string|Date|null; }>
Defined in: storage/PostgresEventStorageAdapter.ts:435
Retrieves FormSubmission events that have occurred since a specified timestamp with pagination support.
Events are ordered by timestamp in ascending order (oldest first) and limited by limit.
Parameters
timestamp
The timestamp (ISO 8601 string or Date object) from which to retrieve events (exclusive).
string | Date
limit?
number = 100
The maximum number of events to retrieve in a single page. Defaults to 100.
Returns
Promise<{ events: FormSubmission[]; nextCursor: string | Date | null; }>
A Promise that resolves with an object containing an array of FormSubmission events and the nextCursor for pagination.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getEventsSincePagination
getAuditLogsSince()
getAuditLogsSince(
timestamp):Promise<AuditLogEntry[]>
Defined in: storage/PostgresEventStorageAdapter.ts:502
Retrieves AuditLogEntry entries that have occurred since a specified timestamp.
Audit logs are ordered by timestamp in descending order (newest first).
Parameters
timestamp
string
The ISO 8601 timestamp string from which to retrieve audit logs (exclusive).
Returns
Promise<AuditLogEntry[]>
A Promise that resolves with an array of AuditLogEntry entries.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getAuditLogsSince
updateSyncLevelFromEvents()
updateSyncLevelFromEvents(
eventList):Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:538
Updates the syncLevel for a batch of FormSubmission events based on their GUIDs.
This operation is performed within a transaction for atomicity.
Parameters
eventList
An array of FormSubmission objects, each containing the GUID and the new syncLevel.
Returns
Promise<void>
A Promise that resolves when all specified events' sync levels are updated.
Throws
If the database transaction fails during the update operation.
Implementation of
EventStorageAdapter.updateSyncLevelFromEvents
getLastRemoteSyncTimestamp()
getLastRemoteSyncTimestamp():
Promise<string>
Defined in: storage/PostgresEventStorageAdapter.ts:555
Retrieves the timestamp of the last successful remote synchronization.
Returns
Promise<string>
A Promise that resolves with the timestamp string, or an empty string if no timestamp exists.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getLastRemoteSyncTimestamp
setLastRemoteSyncTimestamp()
setLastRemoteSyncTimestamp(
timestamp):Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:568
Sets the timestamp of the last successful remote synchronization.
Uses an UPSERT to insert or update the timestamp in the sync_metadata table.
Parameters
timestamp
string
The timestamp string to save.
Returns
Promise<void>
A Promise that resolves when the timestamp is successfully saved.
Throws
If the database operation fails.
Implementation of
EventStorageAdapter.setLastRemoteSyncTimestamp
getLastLocalSyncTimestamp()
getLastLocalSyncTimestamp():
Promise<string>
Defined in: storage/PostgresEventStorageAdapter.ts:578
Retrieves the timestamp of the last successful local synchronization.
Returns
Promise<string>
A Promise that resolves with the timestamp string, or an empty string if no timestamp exists.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getLastLocalSyncTimestamp
setLastLocalSyncTimestamp()
setLastLocalSyncTimestamp(
timestamp):Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:591
Sets the timestamp of the last successful local synchronization.
Uses an UPSERT to insert or update the timestamp in the sync_metadata table.
Parameters
timestamp
string
The timestamp string to save.
Returns
Promise<void>
A Promise that resolves when the timestamp is successfully saved.
Throws
If the database operation fails.
Implementation of
EventStorageAdapter.setLastLocalSyncTimestamp
getLastPullExternalSyncTimestamp()
getLastPullExternalSyncTimestamp():
Promise<string>
Defined in: storage/PostgresEventStorageAdapter.ts:601
Retrieves the timestamp of the last successful external pull synchronization.
Returns
Promise<string>
A Promise that resolves with the timestamp string, or an empty string if no timestamp exists.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getLastPullExternalSyncTimestamp
setLastPullExternalSyncTimestamp()
setLastPullExternalSyncTimestamp(
timestamp):Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:614
Sets the timestamp of the last successful external pull synchronization.
Uses an UPSERT to insert or update the timestamp in the sync_metadata table.
Parameters
timestamp
string
The timestamp string to save.
Returns
Promise<void>
A Promise that resolves when the timestamp is successfully saved.
Throws
If the database operation fails.
Implementation of
EventStorageAdapter.setLastPullExternalSyncTimestamp
getLastPushExternalSyncTimestamp()
getLastPushExternalSyncTimestamp():
Promise<string>
Defined in: storage/PostgresEventStorageAdapter.ts:624
Retrieves the timestamp of the last successful external push synchronization.
Returns
Promise<string>
A Promise that resolves with the timestamp string, or an empty string if no timestamp exists.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getLastPushExternalSyncTimestamp
setLastPushExternalSyncTimestamp()
setLastPushExternalSyncTimestamp(
timestamp):Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:637
Sets the timestamp of the last successful external push synchronization.
Uses an UPSERT to insert or update the timestamp in the sync_metadata table.
Parameters
timestamp
string
The timestamp string to save.
Returns
Promise<void>
A Promise that resolves when the timestamp is successfully saved.
Throws
If the database operation fails.
Implementation of
EventStorageAdapter.setLastPushExternalSyncTimestamp
isEventExisted()
isEventExisted(
guid):Promise<boolean>
Defined in: storage/PostgresEventStorageAdapter.ts:648
Checks if an event with the given GUID exists in the event store for the current tenant.
Parameters
guid
string
The GUID of the event to check.
Returns
Promise<boolean>
A Promise that resolves to true if the event exists, false otherwise.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.isEventExisted
getAuditTrailByEntityGuid()
getAuditTrailByEntityGuid(
entityGuid):Promise<AuditLogEntry[]>
Defined in: storage/PostgresEventStorageAdapter.ts:666
Retrieves the audit trail for a specific entity, identified by its entityGuid.
Audit log entries are ordered by timestamp in descending order (newest first).
Parameters
entityGuid
string
The GUID of the entity to retrieve the audit trail for.
Returns
Promise<AuditLogEntry[]>
A Promise that resolves with an array of AuditLogEntry entries.
Throws
If the database query fails.
Implementation of
EventStorageAdapter.getAuditTrailByEntityGuid
clearStore()
clearStore():
Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:700
Clears all events, audit logs, and sync metadata for the current tenant from the store.
Returns
Promise<void>
A Promise that resolves when all data is cleared.
Throws
If the database deletion fails.
Implementation of
EventStorageAdapter.clearStore
persistHashAnchor()
persistHashAnchor(
hash):Promise<void>
Defined in: storage/PostgresEventStorageAdapter.ts:712
Persists the latest hash anchor for tamper detection on restart.
Parameters
hash
string
The hash string to persist.
Returns
Promise<void>
A Promise that resolves when the hash is persisted.
Implementation of
EventStorageAdapter.persistHashAnchor
getPersistedHashAnchor()
getPersistedHashAnchor():
Promise<string|null>
Defined in: storage/PostgresEventStorageAdapter.ts:721
Retrieves the previously persisted hash anchor, or null if none exists.
Returns
Promise<string | null>
The persisted hash string, or null if no anchor has been saved.