Extend the Flowcore CLI
The Flowcore CLI allows you to stream data from Flowcore. You can extend the CLI and add your own commands to fit your needs.
Add a new plugin
To create a new plugin you can use the Flowcore CLI
flowcore new plugin <plugin-name>Then you can link the plugin to the Flowcore CLI to test it
flowcore plugins link <path to plugin directory>Rename the topic to the one you want to create a command for, for example stream.
If you run flowcore --help you should see your new command in the list.
Stream
To create a new stream output you need to add the @flowcore/cli-plugin-core package to your plugin.
yarn add @flowcore/cli-plugin-corenpm install @flowcore/cli-plugin-coreThen you can create a new command under the src/commands/stream directory.
import { OutputService, SourceEvent, StreamFlags } from "@flowcore/cli-plugin-core";import type { Logger } from "@flowcore/cli-plugin-config";import { ux } from "@oclif/core";
export class EmojiService implements OutputService {
  constructor(private readonly logger: Logger) {}
  // this is called when the stream is done, but only if the stream is not live  async done(): Promise<void> {    this.logger.info(ux.colorize("green", "Done!!! 🥳"))  }
  // this is called before the stream is started  async start(): Promise<void> {    this.logger.info(ux.colorize("green", "Starting stream... 🚀"))  }
  // this is called for each error in the stream  async error(error: Error): Promise<void> {    this.logger.error(error.message)  }
  // this is the description of the output processor, used in the help command  getDescription(): string {    return "Emojify the output"  }
  // this is the name of the output processor  getName(): string {    return "emojify"  }
  // this is called for each event in the stream  async process(event: SourceEvent, streamFlags: StreamFlags, complete: () => void): Promise<void> {    const emojiEvent = {      ...event,      emoji: "😎",    }    this.logger.info(JSON.stringify(emojiEvent))
    // if you want to stop the stream on demand you can call complete()    // this is useful if you want to stop the stream after a certain number of events    # complete()  }}// src/commands/stream/emojify.tsimport { Flags } from "@oclif/core"import dayjs from "dayjs"import customParseFormat from "dayjs/plugin/customParseFormat.js"
import { BaseStreamCommand, STREAM_ARGS } from "@flowcore/cli-plugin-core"import { EmojiService } from "../../services/emoji.service.js"
dayjs.extend(customParseFormat)
export default class EmojifyStream extends BaseStreamCommand<typeof EmojifyStream> {  static args = STREAM_ARGS
  static description = "Emojify the output"  static examples = [    '<%= config.bin %> <%= command.id %> "https://flowcore.io/<org>/<Data Core>/*" -s 1y --no-live',  ]
  static flags = {    // add flags to the command to pass to the output processor  }
  public async run(): Promise<void> {    const { args, flags } = await this.parse(EmojifyStream)
    const emojiService = new EmojiService(this.logger)
    // register the emoji service as an output processor    this.streamService.registerOutputProcessor(emojiService)
    // initialize the stream service with the stream url and the output processor    await this.streamService.init(args.STREAM, {      ...flags,      output: "emojify",    })
    await this.streamService.startStream()  }}Then you can run the command
flowcore stream emojify <STREAM URL> --no-liveAnd you should see the output logged to the console with the 😎 emoji in each event, and a done message at the end.