Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions PR_DESCRIPTION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# feat(ingestion): implementar fundação do módulo de ingestão com TimescaleDB e dual-write Discord

**Resolve:** #299 (Fase 1 — somente Discord)

## Contexto

Hoje cada módulo `integration-*` escreve diretamente no Postgres transacional usado por login, painéis, gamification e identity. Com ~3.4 GB de dados de atividade acumulados e novos provedores planejados (Instagram, WhatsApp), essa coabitação é insustentável.

Este PR inicializa o módulo `ingestion` como futuro ponto único de entrada para dados de todos os provedores, apoiado por uma instância dedicada de TimescaleDB. O escopo é deliberadamente limitado ao **Discord** — Twitch, GitHub e Dev.to virão em PRs separados após o time alinhar onde os DTOs de cada provedor devem morar (`integration-*` ou `ingestion`).

## O que mudou

### Infraestrutura

| Arquivo | Alteração |
| --------------------- | --------------------------------------------------------------------------------------------------- |
| `docker-compose.yml` | Adicionado serviço `he4rtbot-timescaledb` (TimescaleDB HA na porta `5436`) com volume e healthcheck |
| `config/database.php` | Adicionada conexão `timescaledb` apontando para a nova instância |
| `composer.json` | Registrado `he4rt/ingestion` como dependência do módulo |

### Novo módulo: `app-modules/ingestion/`

| Arquivo | Finalidade |
| --------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `database/migrations/…_create_timescaledb_tables.php` | Cria `raw_payloads` (cofre append-only), hypertable `messages` (particionada por `sent_at`) e hypertable `voice_messages` (particionada por `occurred_at`) |
| `Providers/IngestionServiceProvider.php` | Escuta o evento `discord.message.received` e registra o comando de backfill |
| `Listeners/ProcessRawDiscordMessage.php` | Listener na fila `ingestion` — salva o JSON original em `raw_payloads`, depois roda o Transform |
| `Actions/TransformDiscordMessage.php` | Usa o `DiscordMessageDTO::fromDump()` + `toDatabase()` existente para garantir paridade campo a campo com o insert legado no Postgres |
| `Models/RawPayload.php` | Model Eloquent apontando para a conexão `timescaledb` |
| `Models/Message.php` | Model Eloquent com override de PK composta (`id` + `sent_at`) exigida pelo TimescaleDB |
| `Console/Commands/BackfillPostgresToTimescaleCommand.php` | Cópia chunked e idempotente (`upsert`) do histórico de mensagens do Postgres → TimescaleDB |
| `tests/Feature/DualWriteAndBackfillTest.php` | Valida a corretude do backfill e o fluxo completo evento → raw_payload → hypertable |

### Arquivos modificados

| Arquivo | Alteração |
| -------------------------- | ---------------------------------------------------------------------------------------------------------------------------------- |
| `MessageReceivedEvent.php` | Adicionado `event('discord.message.received', ['raw_payload' => …])` no topo do handler — persistência legada intacta (dual-write) |

## Arquitetura

```
Discord Bot (WS)
├─ event('discord.message.received') ← NOVO (async, fila)
│ │
│ ▼
│ ProcessRawDiscordMessage (queue: ingestion)
│ │
│ ├─ RawPayload::create() → TimescaleDB.raw_payloads
│ └─ TransformDiscordMessage → TimescaleDB.messages (hypertable)
└─ Persistência legada (sync) → Postgres.messages ← INTOCADO
```

Ambos os caminhos rodam de forma independente. O Postgres legado continua sendo a fonte da verdade até o backfill atingir paridade (Decisão 10).

## O que NÃO está neste PR

- Ingestão de mensagens de voz (a tabela `voice_messages` foi criada para adiantar a estrutura, mas o listener e o ETL de voz virão na próxima fase).
- Ingestão de Twitch / GitHub / Dev.to (pendente discussão sobre ownership dos DTOs)
- Continuous aggregates / views materializadas
- Feature flag para cutover do dual-write
- Migração das queries do dashboard (`external_identity_id` → `external_account_id`)

## Como testar

```bash
# Suba o banco novo e rode as migrations
docker compose up -d he4rtbot-timescaledb
php artisan migrate --path=app-modules/ingestion/database/migrations --database=timescaledb

# Inicie o worker da nova fila e o bot
php artisan queue:work --queue=ingestion,default
php artisan bot:boot

# Em outro terminal, teste a idempotência do backfill
php artisan ingestion:backfill-postgres-timescale
```

## Notas de deploy

> [!WARNING]
> Produção requer uma instância de TimescaleDB e workers escutando a fila `ingestion`. O dashboard e o sistema de XP continuam lendo do Postgres legado — nenhuma mudança de comportamento para o usuário final.
8 changes: 8 additions & 0 deletions app-modules/bot-discord/src/Events/MessageReceivedEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ public function handle(Message $message): void
}

try {
/**
* Dispatches the raw message payload to the ingestion queue for
* asynchronous processing (activity tracking, telemetry, moderation).
*/
event('discord.message.received', [
'raw_payload' => $message->getRawAttributes(),
]);

$tenantProvider = resolve(ResolveDiscordTenant::class)->handle((string) $message->guild_id);

// Activity tracking — records message for XP/gamification regardless of moderation outcome.
Expand Down
27 changes: 27 additions & 0 deletions app-modules/ingestion/composer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"name": "he4rt/ingestion",
"description": "",
"type": "library",
"version": "1.0.0",
"license": "proprietary",
"autoload": {
"psr-4": {
"He4rt\\Ingestion\\": "src/",
"He4rt\\Ingestion\\Database\\Factories\\": "database/factories/",
"He4rt\\Ingestion\\Database\\Seeders\\": "database/seeders/"
}
},
"autoload-dev": {
"psr-4": {
"He4rt\\Ingestion\\Tests\\": "tests/"
}
},
"minimum-stability": "stable",
"extra": {
"laravel": {
"providers": [
"He4rt\\Ingestion\\Providers\\IngestionServiceProvider"
]
}
}
}
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?php

declare(strict_types=1);

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Schema;

return new class extends Migration
{
protected $connection = 'timescaledb';

public function up(): void
{
DB::connection('timescaledb')->statement('CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;');

Schema::connection('timescaledb')->dropIfExists('voice_messages');
Schema::connection('timescaledb')->dropIfExists('messages');
Schema::connection('timescaledb')->dropIfExists('raw_payloads');
Comment on lines +18 to +20

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Data loss risk: migration drops existing tables.

dropIfExists in up() destroys all data if the migration is re-run. Migrations should be idempotent. Remove these drops or guard them with environment checks.

🛡️ Proposed fix
         DB::connection('timescaledb')->statement('CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;');
 
-        Schema::connection('timescaledb')->dropIfExists('voice_messages');
-        Schema::connection('timescaledb')->dropIfExists('messages');
-        Schema::connection('timescaledb')->dropIfExists('raw_payloads');
-
         Schema::connection('timescaledb')->create('raw_payloads', function (Blueprint $table): void {

Alternatively, if these drops are intentional for local development only, add a guard:

         DB::connection('timescaledb')->statement('CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;');
 
+        if (app()->environment('local', 'testing')) {
         Schema::connection('timescaledb')->dropIfExists('voice_messages');
         Schema::connection('timescaledb')->dropIfExists('messages');
         Schema::connection('timescaledb')->dropIfExists('raw_payloads');
+        }
 
         Schema::connection('timescaledb')->create('raw_payloads', function (Blueprint $table): void {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Schema::connection('timescaledb')->dropIfExists('voice_messages');
Schema::connection('timescaledb')->dropIfExists('messages');
Schema::connection('timescaledb')->dropIfExists('raw_payloads');
DB::connection('timescaledb')->statement('CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;');
Schema::connection('timescaledb')->create('raw_payloads', function (Blueprint $table): void {
// raw_payloads table definition...
});
// ... rest of migration code
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@app-modules/ingestion/database/migrations/2026_06_09_000000_create_timescaledb_tables.php`
around lines 18 - 20, The migration's up() currently calls
Schema::connection('timescaledb')->dropIfExists('voice_messages'),
->dropIfExists('messages') and ->dropIfExists('raw_payloads'), which will
destroy existing data when re-run; remove these dropIfExists calls from the up()
method (or move them to down()) so the migration is idempotent, or wrap them
with an explicit environment guard (e.g., check app environment like
app()->environment('local') or a config flag) so drops only occur in local/dev;
update the migration file to reference the same
Schema::connection('timescaledb') block but without unguarded dropIfExists calls
and ensure down() performs safe teardown if needed.


Schema::connection('timescaledb')->create('raw_payloads', function (Blueprint $table): void {
$table->uuid('id')->primary();
$table->string('provider');
$table->string('event_type');
$table->jsonb('payload');
$table->timestampsTz();
});

Schema::connection('timescaledb')->create('messages', function (Blueprint $table): void {
$table->uuid('id');
$table->uuid('tenant_id')->nullable();
$table->string('external_identity_id');
$table->string('provider_message_id')->nullable();
$table->string('channel_id')->nullable();
$table->text('content');
$table->integer('obtained_experience')->default(0);
$table->jsonb('metadata')->nullable();
$table->unsignedInteger('reactions_count')->default(0);
$table->unsignedInteger('reactions_total')->default(0);
$table->string('kind')->nullable();
$table->smallInteger('raw_message_type')->nullable();
$table->string('source_kind')->nullable();
$table->boolean('is_pinned')->default(false);
$table->boolean('mentions_everyone')->default(false);
$table->smallInteger('mention_role_count')->default(0);
$table->timestampTz('edited_at')->nullable();
$table->string('reply_to_provider_message_id')->nullable();

// Time column for partitioning
$table->timestampTz('sent_at');
$table->timestampsTz();

// Composite primary key (required for Timescale hypertable)
$table->primary(['id', 'sent_at']);
});

DB::connection('timescaledb')->statement("SELECT create_hypertable('messages', 'sent_at');");

Schema::connection('timescaledb')->create('voice_messages', function (Blueprint $table): void {
$table->uuid('id');
$table->uuid('tenant_id')->nullable();
$table->string('external_identity_id');
$table->string('channel_name');
$table->string('channel_id')->nullable();
$table->string('state');
$table->integer('obtained_experience')->default(0);
$table->string('provider_message_id')->nullable();

$table->timestampTz('occurred_at');
$table->timestampsTz();

$table->primary(['id', 'occurred_at']);
});

DB::connection('timescaledb')->statement("SELECT create_hypertable('voice_messages', 'occurred_at');");
}

public function down(): void
{
Schema::connection('timescaledb')->dropIfExists('voice_messages');
Schema::connection('timescaledb')->dropIfExists('messages');
Schema::connection('timescaledb')->dropIfExists('raw_payloads');
}
};
Empty file.
2 changes: 2 additions & 0 deletions app-modules/ingestion/phpstan.ignore.neon
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
parameters:
ignoreErrors: []
6 changes: 6 additions & 0 deletions app-modules/ingestion/phpstan.neon
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
includes:
- phpstan.ignore.neon

parameters:
paths:
- src/
45 changes: 45 additions & 0 deletions app-modules/ingestion/src/Actions/TransformDiscordMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);

namespace He4rt\Ingestion\Actions;

use He4rt\Ingestion\Models\Message;
use He4rt\Ingestion\Models\RawPayload;
use He4rt\IntegrationDiscord\ETL\DTOs\DiscordMessageDTO;
use He4rt\IntegrationDiscord\Models\DiscordGuild;
use Illuminate\Support\Facades\Date;

class TransformDiscordMessage
{
public function execute(RawPayload $rawPayload): ?Message
{
$data = $rawPayload->payload;

if (blank($data['id'] ?? null) || blank(data_get($data, 'author.id'))) {
return null;
}

$dto = DiscordMessageDTO::fromDump($data);

$tenantId = null;
if (filled($data['guild_id'])) {
$tenantId = DiscordGuild::query()->where('discord_guild_id', $data['guild_id'])->value('tenant_id');
}

$extraColumns = [
'tenant_id' => $tenantId,
'external_identity_id' => $dto->authorDiscordId,
'raw_message_type' => $data['type'] ?? null,
'is_pinned' => $data['pinned'] ?? false,
'mentions_everyone' => $data['mention_everyone'] ?? false,
'mention_role_count' => count($data['mention_roles'] ?? []),
'edited_at' => isset($data['edited_timestamp'])
? Date::parse($data['edited_timestamp'])
: null,
'reply_to_provider_message_id' => $data['message_reference']['message_id'] ?? null,
];

return Message::query()->create($dto->toDatabase($extraColumns));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?php

declare(strict_types=1);

namespace He4rt\Ingestion\Console\Commands;

use Illuminate\Console\Attributes\Description;
use Illuminate\Console\Attributes\Signature;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;

#[Description('Backfills messages from the default PostgreSQL database to the new TimescaleDB hypertable.')]
#[Signature('ingestion:backfill-postgres-timescale {--chunk=1000 : The chunk size for messages}')]
final class BackfillPostgresToTimescaleCommand extends Command
{
public function handle(): int
{
$this->info('Starting message backfill from PostgreSQL to TimescaleDB...');

$chunkSize = (int) $this->option('chunk');
$processed = 0;

DB::connection('pgsql')->table('messages')
->orderBy('id')
->chunk($chunkSize, function ($messages) use (&$processed): void {
Comment on lines +20 to +25

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Validate --chunk before calling chunk().

Line 20 allows 0/negative values, which can break execution at Line 25. Enforce >= 1 and fail fast with a clear error.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@app-modules/ingestion/src/Console/Commands/BackfillPostgresToTimescaleCommand.php`
around lines 20 - 25, The code reads the CLI option into $chunkSize and passes
it to DB::chunk without validating it; ensure the
BackfillPostgresToTimescaleCommand validates the chunk size after computing
$chunkSize = (int) $this->option('chunk') and before calling
DB::connection(...)->table('messages')->orderBy('id')->chunk(...): if $chunkSize
< 1, fail fast by emitting a clear error (e.g., $this->error('Invalid --chunk
value; must be >= 1')) and return a non-zero exit code (or throw an
InvalidArgumentException) so the chunk() call never receives 0/negative values.

$payloads = [];

foreach ($messages as $msg) {
$payloads[] = [
'id' => $msg->id,
'tenant_id' => $msg->tenant_id,
'external_identity_id' => $msg->external_identity_id,
'provider_message_id' => $msg->provider_message_id,
'channel_id' => $msg->channel_id,
'content' => $msg->content,
'obtained_experience' => $msg->obtained_experience,
'metadata' => $msg->metadata,
'reactions_count' => $msg->reactions_count,
'reactions_total' => $msg->reactions_total,
'kind' => $msg->kind,
'raw_message_type' => $msg->raw_message_type,
'source_kind' => $msg->source_kind,
'is_pinned' => $msg->is_pinned,
'mentions_everyone' => $msg->mentions_everyone,
'mention_role_count' => $msg->mention_role_count,
'edited_at' => $msg->edited_at,
'reply_to_provider_message_id' => $msg->reply_to_provider_message_id,
'sent_at' => $msg->sent_at,
'created_at' => $msg->created_at,
'updated_at' => $msg->updated_at,
];
}

DB::connection('timescaledb')->table('messages')->upsert($payloads, ['id', 'sent_at']);

$processed += count($messages);
$this->info(sprintf('Processed %d messages...', $processed));
});
Comment on lines +23 to +58

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

chunk() on a mutating source can lose rows during backfill.

Lines 23-58 use offset pagination (chunk) while the legacy table may still receive writes during dual-write. This can skip records, causing incomplete migration even with upsert. Use a stable window (captured upper bound) plus deterministic cursor strategy.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@app-modules/ingestion/src/Console/Commands/BackfillPostgresToTimescaleCommand.php`
around lines 23 - 58, The current use of
DB::connection('pgsql')->table('messages')->orderBy('id')->chunk(...) can skip
rows if the source table is being written during backfill; replace the chunk()
pattern with a stable window + cursor loop: capture a fixed upper bound (e.g.,
$maxId = DB::connection('pgsql')->table('messages')->max('id')) before paging,
then loop using a deterministic cursor (e.g., where('id', '>',
$lastId)->where('id', '<=', $maxId)->orderBy('id')->limit($chunkSize)), build
$payloads and upsert into timescaledb, and set $lastId to the last processed
message id before the next iteration; update the code around the chunk usage in
BackfillPostgresToTimescaleCommand to implement this cursor-based pagination and
remove chunk().


$this->info('Backfill completed successfully!');

return self::SUCCESS;
}
}
40 changes: 40 additions & 0 deletions app-modules/ingestion/src/Listeners/ProcessRawDiscordMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

declare(strict_types=1);

namespace He4rt\Ingestion\Listeners;

use He4rt\Ingestion\Actions\TransformDiscordMessage;
use He4rt\Ingestion\Models\RawPayload;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Support\Facades\Log;
use Throwable;

class ProcessRawDiscordMessage implements ShouldQueue
{
public string $queue = 'ingestion';

public function handle(array $payload): void
{
$rawPayload = $payload['raw_payload'] ?? $payload;

if (blank($rawPayload)) {
return;
}

$record = RawPayload::query()->create([
'provider' => 'discord',
'event_type' => 'message_create',
'payload' => $rawPayload,
]);

try {
(new TransformDiscordMessage)->execute($record);
} catch (Throwable $throwable) {
Log::error('[Ingestion] Failed to transform Discord message', [
'raw_payload_id' => $record->id,
'error' => $throwable->getMessage(),
]);
}
}
}
Loading