Data Pump
The Flowcore Data Pump is a TypeScript client for streaming and processing events from the Flowcore platform. It provides reliable, high-performance event handling with automatic retry, buffering, and state management.
Key Features
- Time bucket polling — efficiently fetches events organized into hourly buckets
- State management — resume processing from where you left off
- Buffering — in-memory queue between fetching and processing with backpressure handling
- Automatic retry — exponential backoff with configurable redelivery limits
- Push and pull modes — automatic lifecycle management or full manual control
Installation
bun add @flowcore/data-pumpnpm install @flowcore/data-pumpBasic Setup
Create a Data Pump instance using FlowcoreDataPump.create():
import { FlowcoreDataPump } from "@flowcore/data-pump"
const dataPump = FlowcoreDataPump.create({ auth: { apiKey: "your-api-key", }, dataSource: { tenant: "tenant-name", dataCore: "data-core-name", flowType: "flow-type-name", eventTypes: ["event-1", "event-2"], }, stateManager: { getState: () => null, setState: (state) => console.log("Position:", state), }, processor: { concurrency: 5, handler: async (events) => { await processEvents(events) }, }, notifier: { type: "websocket" }, bufferSize: 100, maxRedeliveryCount: 3,})Push Mode
Push mode is the recommended approach. The pump automatically manages the reserve, process, and acknowledge lifecycle for you.
const dataPump = FlowcoreDataPump.create({ auth: { apiKey: "your-api-key", }, dataSource: { tenant: "tenant-name", dataCore: "data-core-name", flowType: "flow-type-name", eventTypes: ["event-1"], }, stateManager: { getState: () => null, setState: (state) => { // Persist state for resume capability }, }, processor: { concurrency: 5, handler: async (events) => { // Your business logic here await processEvents(events) // Pump auto-acknowledges on success }, failedHandler: async (failedEvents) => { // Handle events that exceeded maxRedeliveryCount console.error("Permanent failures:", failedEvents) }, }, notifier: { type: "websocket" },})Pull Mode
Pull mode gives you full manual control over the event lifecycle. Omit the processor configuration to enable it.
const dataPump = FlowcoreDataPump.create({ auth: { apiKey: "your-api-key", }, dataSource: { tenant: "tenant-name", dataCore: "data-core-name", flowType: "flow-type-name", eventTypes: ["event-1"], }, stateManager: { getState: () => null, setState: (state) => { // Persist state }, }, notifier: { type: "websocket" },})
// Manually control the lifecycleconst events = await dataPump.reserve(10)
const results = await Promise.allSettled( events.map((e) => processEvent(e)))
const successful = results .filter((r) => r.status === "fulfilled") .map((_, i) => events[i].id)
const failed = results .filter((r) => r.status === "rejected") .map((_, i) => events[i].id)
await dataPump.acknowledge(successful)await dataPump.fail(failed)State Management
The state manager controls where the pump starts reading events. Return null to start in live mode (new events only), or return a state object to resume from a specific position.
State Format
interface FlowcoreDataPumpState { timeBucket: string // "yyyyMMddHH0000" format eventId?: string // Optional event ID for precision}Time buckets use the format yyyyMMddHH0000. For example, 20240315140000 represents March 15, 2024 at 2:00 PM.
Memory-Based (Development)
let currentState: FlowcoreDataPumpState | null = null
const dataPump = FlowcoreDataPump.create({ // ...other config stateManager: { getState: () => currentState, setState: (state) => { currentState = state }, },})Database-Based (Production)
const dataPump = FlowcoreDataPump.create({ // ...other config stateManager: { getState: async () => { const result = await db.query( "SELECT time_bucket, event_id FROM pump_state WHERE id = ?", ["main"] ) return result[0] ? { timeBucket: result[0].time_bucket, eventId: result[0].event_id } : null }, setState: async (state) => { await db.query( "INSERT OR REPLACE INTO pump_state VALUES (?, ?, ?)", ["main", state.timeBucket, state.eventId] ) }, },})Authentication
API Key
auth: { apiKey: "your-api-key",}OIDC / Bearer Token
auth: { getBearerToken: () => oidc.getToken().then((token) => token.accessToken),}Configuration Reference
| Option | Type | Description |
|---|---|---|
auth | object | API key or bearer token authentication (required) |
dataSource | object | Tenant, data core, flow type, and event types to consume (required) |
stateManager | object | getState / setState callbacks for position tracking (required) |
processor | object | Push mode handler and concurrency config (omit for pull mode) |
notifier | object | Notification transport, e.g. { type: "websocket" } |
bufferSize | number | In-memory buffer capacity between fetching and processing |
maxRedeliveryCount | number | Maximum retry attempts before an event is sent to failedHandler |
noTranslation | boolean | Skip event payload translation when true |
concurrency | number | Number of events processed in parallel (inside processor) |