Skip to content

Data Pump

The Flowcore Data Pump is a TypeScript client for streaming and processing events from the Flowcore platform. It provides reliable, high-performance event handling with automatic retry, buffering, and state management.

Key Features

  • Time bucket polling — efficiently fetches events organized into hourly buckets
  • State management — resume processing from where you left off
  • Buffering — in-memory queue between fetching and processing with backpressure handling
  • Automatic retry — exponential backoff with configurable redelivery limits
  • Push and pull modes — automatic lifecycle management or full manual control

Installation

Terminal window
bun add @flowcore/data-pump

Basic Setup

Create a Data Pump instance using FlowcoreDataPump.create():

import { FlowcoreDataPump } from "@flowcore/data-pump"
const dataPump = FlowcoreDataPump.create({
auth: {
apiKey: "your-api-key",
},
dataSource: {
tenant: "tenant-name",
dataCore: "data-core-name",
flowType: "flow-type-name",
eventTypes: ["event-1", "event-2"],
},
stateManager: {
getState: () => null,
setState: (state) => console.log("Position:", state),
},
processor: {
concurrency: 5,
handler: async (events) => {
await processEvents(events)
},
},
notifier: { type: "websocket" },
bufferSize: 100,
maxRedeliveryCount: 3,
})

Push Mode

Push mode is the recommended approach. The pump automatically manages the reserve, process, and acknowledge lifecycle for you.

const dataPump = FlowcoreDataPump.create({
auth: {
apiKey: "your-api-key",
},
dataSource: {
tenant: "tenant-name",
dataCore: "data-core-name",
flowType: "flow-type-name",
eventTypes: ["event-1"],
},
stateManager: {
getState: () => null,
setState: (state) => {
// Persist state for resume capability
},
},
processor: {
concurrency: 5,
handler: async (events) => {
// Your business logic here
await processEvents(events)
// Pump auto-acknowledges on success
},
failedHandler: async (failedEvents) => {
// Handle events that exceeded maxRedeliveryCount
console.error("Permanent failures:", failedEvents)
},
},
notifier: { type: "websocket" },
})

Pull Mode

Pull mode gives you full manual control over the event lifecycle. Omit the processor configuration to enable it.

const dataPump = FlowcoreDataPump.create({
auth: {
apiKey: "your-api-key",
},
dataSource: {
tenant: "tenant-name",
dataCore: "data-core-name",
flowType: "flow-type-name",
eventTypes: ["event-1"],
},
stateManager: {
getState: () => null,
setState: (state) => {
// Persist state
},
},
notifier: { type: "websocket" },
})
// Manually control the lifecycle
const events = await dataPump.reserve(10)
const results = await Promise.allSettled(
events.map((e) => processEvent(e))
)
const successful = results
.filter((r) => r.status === "fulfilled")
.map((_, i) => events[i].id)
const failed = results
.filter((r) => r.status === "rejected")
.map((_, i) => events[i].id)
await dataPump.acknowledge(successful)
await dataPump.fail(failed)

State Management

The state manager controls where the pump starts reading events. Return null to start in live mode (new events only), or return a state object to resume from a specific position.

State Format

interface FlowcoreDataPumpState {
timeBucket: string // "yyyyMMddHH0000" format
eventId?: string // Optional event ID for precision
}

Time buckets use the format yyyyMMddHH0000. For example, 20240315140000 represents March 15, 2024 at 2:00 PM.

Memory-Based (Development)

let currentState: FlowcoreDataPumpState | null = null
const dataPump = FlowcoreDataPump.create({
// ...other config
stateManager: {
getState: () => currentState,
setState: (state) => {
currentState = state
},
},
})

Database-Based (Production)

const dataPump = FlowcoreDataPump.create({
// ...other config
stateManager: {
getState: async () => {
const result = await db.query(
"SELECT time_bucket, event_id FROM pump_state WHERE id = ?",
["main"]
)
return result[0]
? { timeBucket: result[0].time_bucket, eventId: result[0].event_id }
: null
},
setState: async (state) => {
await db.query(
"INSERT OR REPLACE INTO pump_state VALUES (?, ?, ?)",
["main", state.timeBucket, state.eventId]
)
},
},
})

Authentication

API Key

auth: {
apiKey: "your-api-key",
}

OIDC / Bearer Token

auth: {
getBearerToken: () =>
oidc.getToken().then((token) => token.accessToken),
}

Configuration Reference

OptionTypeDescription
authobjectAPI key or bearer token authentication (required)
dataSourceobjectTenant, data core, flow type, and event types to consume (required)
stateManagerobjectgetState / setState callbacks for position tracking (required)
processorobjectPush mode handler and concurrency config (omit for pull mode)
notifierobjectNotification transport, e.g. { type: "websocket" }
bufferSizenumberIn-memory buffer capacity between fetching and processing
maxRedeliveryCountnumberMaximum retry attempts before an event is sent to failedHandler
noTranslationbooleanSkip event payload translation when true
concurrencynumberNumber of events processed in parallel (inside processor)