Extend the Flowcore CLI
The Flowcore CLI allows you to stream data from Flowcore. You can extend the CLI and add your own commands to fit your needs.
Add a new plugin
To create a new plugin you can use the flowcore cli
flowcore new plugin <plugin-name>
then you can link the plugin to the flowcore cli to test it
flowcore plugins link <path to plugin directory>
rename the topic to the topic you want to create a command for, for example stream
.
if you run flowcore --help
you should see your new command in the list.
Stream
To create a new stream output you need to add the @flowcore/cli-plugin-core
package to your plugin.
yarn add @flowcore/cli-plugin-core
npm install @flowcore/cli-plugin-core
then you can create a new command under the src/commands/stream
directory.
import { OutputService, SourceEvent, StreamFlags } from "@flowcore/cli-plugin-core";import type { Logger } from "@flowcore/cli-plugin-config";import { ux } from "@oclif/core";
export class EmojiService implements OutputService {
constructor(private readonly logger: Logger) {}
// this is called when the stream is done, but only if the stream is not live async done(): Promise<void> { this.logger.info(ux.colorize("green", "Done!!! 🥳")) }
// this is called before the stream is started async start(): Promise<void> { this.logger.info(ux.colorize("green", "Starting stream... 🚀")) }
// this is called for each error in the stream async error(error: Error): Promise<void> { this.logger.error(error.message) }
// this is the description of the output processor, used in the help command getDescription(): string { return "Emojify the output" }
// this is the name of the output processor getName(): string { return "emojify" }
// this is called for each event in the stream async process(event: SourceEvent, streamFlags: StreamFlags, complete: () => void): Promise<void> { const emojiEvent = { ...event, emoji: "😎", } this.logger.info(JSON.stringify(emojiEvent))
// if you want to stop the stream on demand you can call complete() // this is useful if you want to stop the stream after a certain number of events # complete() }}
// src/commands/stream/emojify.tsimport { Flags } from "@oclif/core"import dayjs from "dayjs"import customParseFormat from "dayjs/plugin/customParseFormat.js"
import { BaseStreamCommand, STREAM_ARGS } from "@flowcore/cli-plugin-core"import { EmojiService } from "../../services/emoji.service.js"
dayjs.extend(customParseFormat)
export default class EmojifyStream extends BaseStreamCommand<typeof EmojifyStream> { static args = STREAM_ARGS
static description = "Emojify the output" static examples = [ '<%= config.bin %> <%= command.id %> "https://flowcore.io/<org>/<data core>/*" -s 1y --no-live', ]
static flags = { // add flags to the command to pass to the output processor }
public async run(): Promise<void> { const { args, flags } = await this.parse(EmojifyStream)
const emojiService = new EmojiService(this.logger)
// register the emoji service as an output processor this.streamService.registerOutputProcessor(emojiService)
// initialize the stream service with the stream url and the output processor await this.streamService.init(args.STREAM, { ...flags, output: "emojify", })
await this.streamService.startStream() }}
then you can run the command
flowcore stream emojify <STREAM URL> --no-live
and you should see the output logged to the console with the 😎 emoji in each event, and a done message at the end.