diff --git a/docker-compose.yaml b/docker-compose.yaml index 6ec414fe24..1c5d905ce7 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -2,7 +2,7 @@ version: "3.8" services: postgres: - image: "postgres:12" + image: "postgres:14" container_name: postgres restart: always environment: @@ -14,4 +14,4 @@ services: - data:/var/lib/postgresql/data volumes: - data: \ No newline at end of file + data: diff --git a/plugins/incremental-ingestion-backend/README.md b/plugins/incremental-ingestion-backend/README.md index 1b25df1ce3..f11998f361 100644 --- a/plugins/incremental-ingestion-backend/README.md +++ b/plugins/incremental-ingestion-backend/README.md @@ -39,15 +39,15 @@ This approach has the following benefits, ```ts const builder = CatalogBuilder.create(env); // incremental builder receives builder because it'll register - // incremental entity providers with the builder + // incremental entity providers with the builder const incrementalBuilder = IncrementalCatalogBuilder.create(env, builder); ``` -3. Last step, add `await incrementBuilder.build()` after `await builder.build()` to ensure that all `CatalogBuider` migration run before running `incrementBuilder.build()` migrations. +3. Last step, add `await incrementalBuilder.build()` after `await builder.build()` to ensure that all `CatalogBuider` migration run before running `incrementalBuilder.build()` migrations. ```ts const { processingEngine, router } = await builder.build(); - // this has to run after `await builder.build()` so ensure that catalog migrations are completed - // before incremental builder migrations are executed + // this has to run after `await builder.build()` so ensure that catalog migrations are completed + // before incremental builder migrations are executed await incrementalBuilder.build(); ``` @@ -67,18 +67,18 @@ import { PluginEnvironment } from '../types'; export default async function createPlugin( env: PluginEnvironment, ): Promise { - + const builder = CatalogBuilder.create(env); // incremental builder receives builder because it'll register - // incremental entity providers with the builder + // incremental entity providers with the builder const incrementalBuilder = IncrementalCatalogBuilder.create(env, builder); builder.addProcessor(new ScaffolderEntitiesProcessor()); const { processingEngine, router } = await builder.build(); - // this has to run after `await builder.build()` so ensure that catalog migrations are completed - // before incremental builder migrations are executed + // this has to run after `await builder.build()` so ensure that catalog migrations are completed + // before incremental builder migrations are executed await incrementalBuilder.build(); await processingEngine.start(); @@ -186,9 +186,9 @@ If you need to pass a token to your API, then you can create a constructor that ```ts export class MyIncrementalEntityProvider implements IncrementalEntityProvider { - + token: string; - + construtor(token: string) { this.token = token; } @@ -211,9 +211,9 @@ The last step is to implement the actual `next` method that will accept the curs ```ts export class MyIncrementalEntityProvider implements IncrementalEntityProvider { - + token: string; - + construtor(token: string) { this.token = token; } @@ -276,7 +276,7 @@ export class MyIncrementalEntityProvider implements IncrementalEntityProvider, options: IncrementalEntityProviderOptions, ) { + // TODO Check if build was called and throw error const { burstInterval, burstLength, restLength } = options; const { logger: catalogLogger, database, scheduler } = this.env; const ready = this.ready; @@ -57,4 +58,4 @@ export class IncrementalCatalogBuilder { }, }); } -} \ No newline at end of file +} diff --git a/plugins/incremental-ingestion-backend/src/iteration-db.ts b/plugins/incremental-ingestion-backend/src/iteration-db.ts index f44bd06460..7a935c0adb 100644 --- a/plugins/incremental-ingestion-backend/src/iteration-db.ts +++ b/plugins/incremental-ingestion-backend/src/iteration-db.ts @@ -84,6 +84,7 @@ export async function createIterationDB(options: IterationDBOptions): Promise { + const factory = new ClientFactory(); + let catalog: CatalogClient; + let stop: () => Promise; + let rebuild: () => Operation; + + beforeAll(function* () { + const logger = yield useLogger() + const config = new ConfigReader(backstageConfig); + const reader = UrlReaders.default({ logger, config }); + const databaseManager = DatabaseManager.fromConfig(config); + const discovery = SingleHostDiscovery.fromConfig(config); + const tokenManager = ServerTokenManager.noop(); + const permissions = ServerPermissionClient.fromConfig(config, { discovery, tokenManager }); + const scheduler = TaskScheduler.fromConfig(config).forPlugin('catalog'); + const env: PluginEnvironment = { + logger, + database: databaseManager.forPlugin('catalog'), + config, + reader, + permissions, + scheduler + }; + const { builder, incrementalBuilder } = yield useIncrementalBuilder({ ...env, factory }) + ;({ stop, rebuild } = yield useCatalog({ ...env, builder, incrementalBuilder, discovery })); + catalog = new CatalogClient({ discoveryApi: discovery }); + }); + + + beforeEach(function* () { + yield stop(); + yield rebuild(); + }) + + // TODO example of scenario + // get 5 entities => get an error => start again => get 4 entities instead of 5 + + describe('successfully ingest data', function* () { + beforeAll(function* () { + yield factory.createClient([ + { id: 1, data: ['a', 'b', 'c', 'd', 'e'] }, + ]); + }) + + it.eventually('test', function* () { + const { items } = yield catalog.getEntities() + + expect(items).toHaveLength(5) + }) + }); + + jest.setTimeout(15000); + it('successfuly ingest data', function* () { + yield factory.createClient([ + { id: 1, data: ['a', 'b', 'c', 'd', 'e'], delay: 10 }, + ]); + yield new Promise(resolve => setTimeout(resolve, 1000)); + console.dir(yield catalog.getEntities(), { depth: 10 }); + }) + it('successfuly ingest data 2', function* () { + yield factory.createClient([ + { id: 1, data: ['1', '2', '3', '4', '5'], delay: 10 }, + ]); + yield new Promise(resolve => setTimeout(resolve, 1000)); + console.dir(yield catalog.getEntities(), { depth: 10 }); + }) +}) diff --git a/plugins/incremental-ingestion-backend/src/tests/setupTests.ts b/plugins/incremental-ingestion-backend/src/tests/setupTests.ts new file mode 100644 index 0000000000..ce5f69cdae --- /dev/null +++ b/plugins/incremental-ingestion-backend/src/tests/setupTests.ts @@ -0,0 +1,219 @@ +/* eslint-disable func-names */ +import express from 'express'; +import Router from 'express-promise-router'; +import { CatalogBuilder, CatalogProcessingEngine } from '@backstage/plugin-catalog-backend'; +import { ensure, once, Operation } from 'effection'; +import { Duration } from 'luxon'; +import { createLogger, Logger, transports } from 'winston'; +import { EntityIteratorResult, IncrementalCatalogBuilder } from '..'; +import { IncrementalEntityProvider, IncrementalEntityProviderOptions, PluginEnvironment } from '../types'; +import { createServiceBuilder, SingleHostDiscovery } from '@backstage/backend-common'; + +type Instruction = DataInstruction | ErrorInstruction; + +interface DataInstruction { + type: 'data'; + data: string[]; + totalPages: number; + delay?: number; +} + +interface ErrorInstruction { + type: 'error'; + delay?: number; +} + +interface SuccessResponse { + status: 'success'; + data: string[]; + totalPages: number; +} + +interface ErrorResponse { + status: 'error'; + error: string; +} + +type Response = SuccessResponse | ErrorResponse; + +interface Client { + fetch(page: number): Promise; +} + +function delay(ms: number = 0) { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +export class ClientFactory { + private resolve: () => void = () => {}; + client: Client = { fetch() { throw new Error('Client is not ready') } }; + + createClient(instructions: Instruction[]): Promise { + let instruction: Instruction | undefined; + let restInstruction: Instruction[] = instructions; + const pages: string[][] = []; + + this.client = { + fetch: async (page: number) => { + ([instruction, ...restInstruction] = restInstruction); + if (instruction === undefined) { + this.resolve(); + return { status: 'success', data: pages[page], totalPages: pages.length }; + } + await delay(instruction.delay); + if (instruction.type === 'data') { + pages[page] = instruction.data; + return { status: 'success', data: instruction.data, totalPages: instruction.totalPages }; + } + return { status: 'error', error: '¯\\_(ツ)_/¯' }; + } + } + return new Promise(resolve => (this.resolve = resolve)); + } +} + +class EntityProvider implements IncrementalEntityProvider { + getProviderName() { return 'EntityProvider' } + + constructor(private factory: ClientFactory) { + + } + + async around(burst: (client: Client) => Promise): Promise { + await burst(this.factory.client); + } + + async next(client: Client, page: number = 0): Promise> { + const response = await client.fetch(page); + if (response.status === 'error') throw new Error(response.error); + + const nextPage = page + 1; + const done = nextPage > response.totalPages; + const entities = response.data.map(item => ({ + entity: { + apiVersion: 'backstage.io/v1beta1', + kind: 'Component', + metadata: { + name: item, + annotations: { + // You need to define these, otherwise they'll fail validation + 'backstage.io/managed-by-location': `test:${this.getProviderName()}`, + 'backstage.io/managed-by-origin-location': `test:${this.getProviderName()}`, + } + }, + spec: { + type: 'service', + lifecycle: 'production', + owner: 'guest', + } + } + })); + + return { + done, + entities, + cursor: nextPage + } + } +} + +export function useCatalog(env: PluginEnvironment & { discovery: SingleHostDiscovery, builder: CatalogBuilder, incrementalBuilder: IncrementalCatalogBuilder }): Operation<{ stop: () => Promise, rebuild: Operation }> { + return { + name: 'Catalog', + *init() { + let processingEngine: CatalogProcessingEngine; + let router: express.Router; + ;({ processingEngine, router } = yield env.builder.build()); + const proxyfiedRouter = new Proxy(router, { + apply(_target, thisArg, argArray: Parameters) { + return router.apply(thisArg, argArray); + }, + get(_target, prop: keyof express.Router) { + return router[prop] + }, + }) + + yield env.incrementalBuilder.build() + yield processingEngine.start(); + yield ensure(() => processingEngine.stop()); + + const apiRouter = Router(); + apiRouter.use('/catalog', proxyfiedRouter); + + const service = createServiceBuilder(module) + .loadConfig(env.config) + .addRouter('/api', apiRouter) + + yield service.start(); + + return { + stop: () => processingEngine.stop(), + rebuild: () => ({ + name: 'Rebuild Catalog', + *init() { + const client = yield env.database.getClient() + + yield client.raw('TRUNCATE TABLE public.knex_migrations CASCADE'); + yield client.raw('TRUNCATE TABLE public.knex_migrations_lock CASCADE'); + yield client.raw('DROP TABLE public.final_entities CASCADE'); + yield client.raw('DROP TABLE public.locations CASCADE'); + yield client.raw('DROP TABLE public.location_update_log CASCADE'); + yield client.raw('DROP TABLE public.refresh_state CASCADE'); + yield client.raw('DROP TABLE public.refresh_state_references CASCADE'); + yield client.raw('DROP TABLE public.relations CASCADE'); + yield client.raw('DROP TABLE public.search CASCADE'); + yield client.raw('TRUNCATE TABLE ingestion.knex_migrations CASCADE'); + yield client.raw('TRUNCATE TABLE ingestion.knex_migrations_lock CASCADE'); + yield client.raw('DROP TABLE ingestion.ingestions CASCADE'); + yield client.raw('DROP TABLE ingestion.ingestion_marks CASCADE'); + yield client.raw('DROP TABLE ingestion.ingestion_mark_entities CASCADE'); + + ;({ processingEngine, router } = yield env.builder.build()); + yield env.incrementalBuilder.build(); + yield processingEngine.start(); + } + }) + } + } + } +} + +export function useIncrementalBuilder(env: PluginEnvironment & { factory: ClientFactory }): Operation<{ builder: CatalogBuilder, incrementalBuilder: IncrementalCatalogBuilder }> { + return { + name: "IncrementalBuilder", + *init() { + const builder = CatalogBuilder.create(env); + const incrementalBuilder = IncrementalCatalogBuilder.create(env, builder); + + const provider = new EntityProvider(env.factory); + const schedule: IncrementalEntityProviderOptions = { + burstInterval: Duration.fromObject({ milliseconds: 100 }), + burstLength: Duration.fromObject({ milliseconds: 100 }), + restLength: Duration.fromObject({ seconds: 1 }), + } + + incrementalBuilder.addIncrementalEntityProvider(provider, schedule); + + return { builder, incrementalBuilder }; + } + } +} + +export function useLogger(): Operation { + return { + name: "Logger", + *init() { + const transport = new transports.Console(); + const logger = createLogger({ + level: 'error', + transports: [transport], + }); + yield ensure(function* () { + logger.end(); + logger.on('error', () => { /* noop */ }); + yield once(logger, 'finish') + }); + return logger + } + } +}