From 3e3d8ce65ea7c23a25672e96e5a96267883b4b47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20B=C3=B6hne?= Date: Mon, 2 Feb 2026 13:03:08 +0100 Subject: [PATCH 1/2] RV-86: add rabbitmq to local ddev setup --- .ddev/.env.web | 2 + .ddev/addon-metadata/rabbitmq/manifest.yaml | 13 ++ .ddev/commands/host/rabbitmq | 40 +++++ .ddev/commands/rabbitmq/rabbitmq | 176 ++++++++++++++++++++ .ddev/commands/rabbitmq/rabbitmqadmin | 8 + .ddev/commands/rabbitmq/rabbitmqctl | 8 + .ddev/config.rabbitmq.yaml | 2 + .ddev/docker-compose.rabbitmq.yaml | 43 +++++ .ddev/rabbitmq/config.yaml | 25 +++ .ddev/rabbitmq/schema.json | 88 ++++++++++ composer.json | 1 + composer.lock | 81 ++++++++- config/packages/messenger.yaml | 4 +- 13 files changed, 485 insertions(+), 6 deletions(-) create mode 100644 .ddev/addon-metadata/rabbitmq/manifest.yaml create mode 100755 .ddev/commands/host/rabbitmq create mode 100755 .ddev/commands/rabbitmq/rabbitmq create mode 100755 .ddev/commands/rabbitmq/rabbitmqadmin create mode 100755 .ddev/commands/rabbitmq/rabbitmqctl create mode 100644 .ddev/config.rabbitmq.yaml create mode 100644 .ddev/docker-compose.rabbitmq.yaml create mode 100644 .ddev/rabbitmq/config.yaml create mode 100644 .ddev/rabbitmq/schema.json diff --git a/.ddev/.env.web b/.ddev/.env.web index 701f9e3c7..c5a402bcc 100644 --- a/.ddev/.env.web +++ b/.ddev/.env.web @@ -11,3 +11,5 @@ MERCURE_PUBLIC_URL=https://jitsi-admin.ddev.site:3000 MERCURE_JWT_SECRET=MDY3OTljNDM3MzRjMWU4ZmFkZTFlNzY5 MAILER_DSN=smtp://:@localhost:1025 + +MESSENGER_TRANSPORT_DSN=amqp://rabbitmq:rabbitmq@rabbitmq:5672/%2f/messages diff --git a/.ddev/addon-metadata/rabbitmq/manifest.yaml b/.ddev/addon-metadata/rabbitmq/manifest.yaml new file mode 100644 index 000000000..af860dbbe --- /dev/null +++ b/.ddev/addon-metadata/rabbitmq/manifest.yaml @@ -0,0 +1,13 @@ +name: rabbitmq +repository: ddev/ddev-rabbitmq +version: 0.2.0 +install_date: "2026-02-02T12:01:24+01:00" +project_files: + - docker-compose.rabbitmq.yaml + - config.rabbitmq.yaml + - commands/host/rabbitmq + - commands/rabbitmq/ + - rabbitmq/config.yaml + - rabbitmq/schema.json +global_files: [] +removal_actions: [] diff --git a/.ddev/commands/host/rabbitmq b/.ddev/commands/host/rabbitmq new file mode 100755 index 000000000..efd8271a2 --- /dev/null +++ b/.ddev/commands/host/rabbitmq @@ -0,0 +1,40 @@ +#!/usr/bin/env bash +#ddev-generated + +## Description: A host wrapper command for rabbitmq/rabbitmq to enhance the command with "launch" capabilities +## Usage: rabbitmq +## Example: ddev rabbitmq + +CMD=$1 + +launch_rabbitmq_gui() { + echo "Launching RabbitMQ Management UI..." + echo "Login using 'rabbitmq' user and 'rabbitmq' password." + + PROTOCOL="HTTP" + if [ "${DDEV_PRIMARY_URL%://*}" != "http" ] && [ -z "${GITPOD_WORKSPACE_ID:-}" ] && [ "${CODESPACES:-}" != "true" ]; then + PROTOCOL="HTTPS" + fi + rabbitmq_port=$(ddev exec "yq '.services.rabbitmq.environment.${PROTOCOL}_EXPOSE | split(\":\")[0]' .ddev/.ddev-docker-compose-full.yaml") + ddev launch :$rabbitmq_port +} + +case $CMD in + launch) + launch_rabbitmq_gui + ;; + *) + if [ "$CMD" = "--help" ] || [ -z "$CMD" ]; then + # Output help and display additional information about the "launch" command + ddev exec -s rabbitmq /mnt/ddev_config/commands/rabbitmq/rabbitmq --help + echo "" + echo -e "\033[1mLaunch\033[0m" + echo "—————" + echo "Launch RabbitMQ Management UI" + echo "👉 ddev rabbitmq launch" + else + # Run the script within service container and pass in all arguments + ddev exec -s rabbitmq /mnt/ddev_config/commands/rabbitmq/rabbitmq $@ + fi + ;; +esac diff --git a/.ddev/commands/rabbitmq/rabbitmq b/.ddev/commands/rabbitmq/rabbitmq new file mode 100755 index 000000000..a9400aff4 --- /dev/null +++ b/.ddev/commands/rabbitmq/rabbitmq @@ -0,0 +1,176 @@ +#!/usr/bin/env bash +#ddev-generated + +## Description: Manage parts of rabbitmq +## Usage: rabbitmq +## Example: ddev rabbitmq + +CMD=$1 + +# Subcommands allowed to watch +ALLOWED_DISPLAY_ARGUMENTS=("overview" "connections" "channels" "consumers" "exchanges" "queues" "bindings" "users" "vhosts" "permissions" "nodes" "parameters" "policies" "operator_policies" "vhost_limits" ) + +YAML_FILE=/mnt/ddev_config/rabbitmq/config.yaml + +function watcher() { + subcommand=$1 + interval=$2 + + if [ "$subcommand" = "overview" ]; then + display_argument="show $subcommand" + else + display_argument="list $subcommand" + fi + + if [[ " ${ALLOWED_DISPLAY_ARGUMENTS[*]} " = *" $subcommand "* ]]; then + while true; do + output=$(rabbitmqadmin "$display_argument") + clear + echo "$output" + echo "Refresh interval: $interval sec - $(date)" + sleep $interval + done + else + echo -e "Watch subcommand '$subcommand' not allowed, use one of these:\n * ${ALLOWED_DISPLAY_ARGUMENTS[*]}" + fi +} + +function add_vhosts() { + vhosts_json=$(rabbitmqctl list_vhosts --formatter json) + readarray vhosts_existing < <(echo "$vhosts_json" | yq -o=y -I=0 '.[].name' -) + readarray vhosts < <(yq -o=j -I=0 '.vhost[]' $YAML_FILE ) + for vhost in "${vhosts[@]}"; do + name=$(echo "$vhost" | yq '.name // ""' -) + + if [[ ! " ${vhosts_existing[*]} " =~ $name ]]; then + description=$(echo "$vhost" | yq '.description // ""' -) + description_option=$([ -z "$description" ] && echo "" || echo "--description \"$description\"") + + default_queue_type=$(echo "$vhost" | yq '.default-queue-type // ""' -) + default_queue_type_option=$([ -z "$default_queue_type" ] && echo "" || echo "--default-queue-type $default_queue_type") + + tags=$(echo "$vhost" | yq '.tags[]' -) + comma_separated=$(echo "${tags[*]}" | xargs | sed -e 's/ /,/g') + tags_option=$([ -z "$comma_separated" ] && echo "" || echo "--tags $comma_separated") + + rabbitmqctl add_vhost $name $description_option $tags_option $default_queue_type_option || exit 1 + else + echo "ℹ️ vhost '$name' already exists! To update a vhost please delete it first." + fi + done +} + +function add_queues() { + readarray queues < <(yq -o=j -I=0 '.queue[]' $YAML_FILE ) + for queue in "${queues[@]}"; do + name=$(echo "$queue" | yq '.name' -) + vhost=$(echo "$queue" | yq '.vhost // "/"' -) + + queues_existing=$(rabbitmqctl list_queues --silent --formatter json --vhost "$vhost") + readarray queues_existing < <(echo "$queues_existing" | yq -o=y -I=0 '.[].name' -) + + if [[ ! " ${queues_existing[*]} " =~ $name ]]; then + durable=$(echo "$queue" | yq '.durable // "true"' -) + rabbitmqadmin declare queue --vhost="$vhost" --name="$name" --durable="$durable" || exit 1 + else + echo "ℹ️ Queue '$name' already exists in vhost '$vhost'! To update the queue please delete it first." + fi + done +} + +function add_users() { + users_json=$(rabbitmqctl list_users --silent --formatter json) + readarray users_existing < <(echo "$users_json" | yq -o=y -I=0 '.[].user' -) + readarray users < <(yq -o=j -I=0 '.user[]' $YAML_FILE ) + + for user in "${users[@]}"; do + name=$(echo "$user" | yq '.name // ""' -) + + if [[ ! " ${users_existing[*]} " =~ $name ]]; then + password=$(echo "$user" | yq '.password // ""' -) + rabbitmqctl add_user "$name" "$password" || exit 1 + + tags=$(echo "$user" | yq '.tags[]' -) + comma_separated=$(echo "${tags[*]}" | xargs | sed -e 's/ /,/g') + rabbitmqctl set_user_tags "$name" "$comma_separated" || exit 1 + + permissions=$(echo "$user" | yq '.permissions[]' -) + for permission in "${permissions[@]}"; do + vhost=$(echo "$permission" | yq '.vhost // "/"' -) + conf=$(echo "$permission" | yq '.conf // ".*"' -) + write=$(echo "$permission" | yq '.write // ".*"' -) + read=$(echo "$permission" | yq '.read // ".*"' -) + rabbitmqctl set_permissions -p "$vhost" "$name" "$conf" "$write" "$read" || exit 1 + done + else + echo "ℹ️ User '$name' already exists! To update a user please delete it first." + fi + done +} + +case $CMD in + apply) + echo "Apply config $YAML_FILE" + + plugins_array=$(yq eval '.plugins[]' "$YAML_FILE") + plugins=$(echo "${plugins_array[*]}" | tr '\n' ' ' | xargs) + rabbitmq-plugins enable $plugins + + add_vhosts + # Ensure the default admin "rabbitmq" has permissions for all virtual hosts + rabbitmqctl set_permissions_globally "rabbitmq" ".*" ".*" ".*" > /dev/null + add_queues + add_users + ;; + + wipe) + users_json=$(rabbitmqctl list_users --silent --formatter json) + readarray users_existing < <(echo "$users_json" | yq -o=y -I=0 '.[].user' -) + for user in "${users_existing[@]}"; do + user=$(echo -n "$user" | tr -d '\n') + if [ "$user" != "rabbitmq" ]; then + rabbitmqctl delete_user "$user" + fi + done + + vhosts_json=$(rabbitmqctl list_vhosts --formatter json) + readarray vhosts_existing < <(echo "$vhosts_json" | yq -o=y -I=0 '.[].name' -) + for host in "${vhosts_existing[@]}"; do + host=$(echo -n "$host" | tr -d '\n') + if [ "$host" != "/" ]; then + rabbitmqctl delete_vhost "$host" + fi + done + ;; + watch) + subcommand="$2" + subcommand=${subcommand:=overview} + interval="$3" + interval=${interval:=2} + + watcher "$subcommand" "$interval" + ;; + + --help|*) + echo "——————————————" + echo -e "\033[1mExample Usage:\033[0m" + echo "——————————————" + echo -e "\033[1mApply\033[0m" + echo "—————" + echo "Create queues, users and add 'plugins' according to configuration (see .ddev/rabbitmq/config.yaml)" + echo "👉 ddev rabbitmq apply" + echo "" + echo -e "\033[1mWipe\033[0m" + echo "—————" + echo "Clear vhosts, queues and users (only vhost '/' and user 'rabbitmq' are kept)" + echo "👉 ddev rabbitmq wipe" + echo "" + echo -e "\033[1mWatcher\033[0m" + echo "———————" + echo "A little wrapper around 'rabbitmqadmin' to be able to watch e.g. queues" + echo "👉 ddev rabbitmq watch " + echo -e "Possible values:\n * ${ALLOWED_DISPLAY_ARGUMENTS[*]}" + echo "" + echo -e "ℹ️ To use the rabbitmqadmin command run 'ddev rabbitmqadmin --help' for details.\nThis command passes all values to rabbitmqadmin within the container." + ;; +esac diff --git a/.ddev/commands/rabbitmq/rabbitmqadmin b/.ddev/commands/rabbitmq/rabbitmqadmin new file mode 100755 index 000000000..13b7c5075 --- /dev/null +++ b/.ddev/commands/rabbitmq/rabbitmqadmin @@ -0,0 +1,8 @@ +#!/usr/bin/env bash +#ddev-generated + +## Description: Manage parts of rabbitmq +## Usage: rabbitmqadmin [options] subcommand +## Example: ddev rabbitmqadmin --help + +rabbitmqadmin $@ diff --git a/.ddev/commands/rabbitmq/rabbitmqctl b/.ddev/commands/rabbitmq/rabbitmqctl new file mode 100755 index 000000000..b85aa61ed --- /dev/null +++ b/.ddev/commands/rabbitmq/rabbitmqctl @@ -0,0 +1,8 @@ +#!/usr/bin/env bash +#ddev-generated + +## Description: Manage parts of rabbitmq +## Usage: rabbitmqctl [--node ] [--timeout ] [--longnames] [--quiet] [] +## Example: ddev rabbitmqctl --help + +rabbitmqctl $@ diff --git a/.ddev/config.rabbitmq.yaml b/.ddev/config.rabbitmq.yaml new file mode 100644 index 000000000..efe6835c0 --- /dev/null +++ b/.ddev/config.rabbitmq.yaml @@ -0,0 +1,2 @@ +#ddev-generated +webimage_extra_packages: ["php${DDEV_PHP_VERSION}-amqp"] diff --git a/.ddev/docker-compose.rabbitmq.yaml b/.ddev/docker-compose.rabbitmq.yaml new file mode 100644 index 000000000..1449b8242 --- /dev/null +++ b/.ddev/docker-compose.rabbitmq.yaml @@ -0,0 +1,43 @@ +#ddev-generated +services: + rabbitmq: + container_name: ddev-${DDEV_SITENAME}-rabbitmq + hostname: ${DDEV_SITENAME}-rabbitmq + image: ${RABBITMQ_DOCKER_IMAGE:-rabbitmq:4-management}-${DDEV_SITENAME}-built + build: + dockerfile_inline: | + ARG RABBITMQ_DOCKER_IMAGE="scratch" + FROM $${RABBITMQ_DOCKER_IMAGE} + RUN command -v apk >/dev/null 2>&1 || { (apt-get update || true) && apt-get install -y --no-install-recommends curl && rm -rf /var/lib/apt/lists/*; } + ARG TARGETARCH + RUN command -v apk >/dev/null 2>&1 && apk add --no-cache yq || { curl -fsSL "https://github.com/mikefarah/yq/releases/latest/download/yq_linux_$${TARGETARCH}" -o /usr/bin/yq && chmod +x /usr/bin/yq; } + args: + RABBITMQ_DOCKER_IMAGE: ${RABBITMQ_DOCKER_IMAGE:-rabbitmq:4-management} + expose: + - 15672 + environment: + - VIRTUAL_HOST=$DDEV_HOSTNAME + - HTTP_EXPOSE=15672:15672 + - HTTPS_EXPOSE=15673:15672 + - RABBITMQ_ERLANG_COOKIE=SWQOKODSQALRPCLNMEQG + - RABBITMQ_DEFAULT_USER=rabbitmq + - RABBITMQ_DEFAULT_PASS=rabbitmq + - RABBITMQ_DEFAULT_VHOST=/ + - RABBITMQADMIN_USERNAME=rabbitmq + - RABBITMQADMIN_PASSWORD=rabbitmq + labels: + com.ddev.site-name: ${DDEV_SITENAME} + com.ddev.approot: ${DDEV_APPROOT} + volumes: + - "rabbitmq:/var/lib/rabbitmq/mnesia" + - ".:/mnt/ddev_config" + - "ddev-global-cache:/mnt/ddev-global-cache" + x-ddev: + describe-info: | + User: rabbitmq + Pass: rabbitmq + ssh-shell: bash + +volumes: + rabbitmq: + name: "${DDEV_SITENAME}_rabbitmq" diff --git a/.ddev/rabbitmq/config.yaml b/.ddev/rabbitmq/config.yaml new file mode 100644 index 000000000..4744999c6 --- /dev/null +++ b/.ddev/rabbitmq/config.yaml @@ -0,0 +1,25 @@ +#ddev-generated +vhost: + - name: ddev-vhost + default-queue-type: classic +queue: + - name: ddev-queue + vhost: ddev-vhost + durable: true + - name: ddev-additional + vhost: ddev-vhost + durable: true +user: + - name: ddev-admin + password: password + tags: + - administrator + - management + permissions: + - vhost: ddev-vhost + conf: .* + write: .* + read: .* +plugins: + # Required! + - rabbitmq_management diff --git a/.ddev/rabbitmq/schema.json b/.ddev/rabbitmq/schema.json new file mode 100644 index 000000000..4b89f49ea --- /dev/null +++ b/.ddev/rabbitmq/schema.json @@ -0,0 +1,88 @@ +{ + "#ddev-generated":true, + "$schema": "./config.yaml", + "type": "object", + "properties": { + "user": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": { + "type": "string" + }, + "password": { + "type": "string" + }, + "tags": { + "type": "array", + "items": { + "type": "string", + "enum": [ + "management", + "policymaker", + "monitoring", + "administrator" + ] + } + } + }, + "required": ["name", "password", "tags"] + } + }, + "queue": { + "type": "array", + "items": { + "type": "object", + "properties": { + "vhost": { + "type": "string", + "pattern": "^([a-zA-Z_\-\/]*)$" + }, + "name": { + "type": "string", + "pattern": "^([a-zA-Z_-]*)$" + }, + "durable": { + "type": "boolean" + } + }, + "required": ["name"] + } + }, + "plugins": { + "type": "array", + "items": { + "type": "string" + } + }, + "vhost": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": { + "type": "string", + "pattern": "^([a-zA-Z_\-0-9]*)$" + }, + "default-queue-type": { + "type": "string", + "enum": [ + "classic", + "quorum", + "stream" + ] + }, + "tags": { + "type": "array", + "items": { + "type": "string", + "pattern": "^([a-zA-Z_\-0-9]*)$" + } + } + }, + "required": ["name", "default-queue-type"] + } + } + } +} diff --git a/composer.json b/composer.json index 002bf9e54..3c31d0a27 100644 --- a/composer.json +++ b/composer.json @@ -34,6 +34,7 @@ "ozdemirburak/iris": "^3.1", "phpdocumentor/reflection-docblock": "^5.2", "stevenmaguire/oauth2-keycloak": "^5.1", + "symfony/amqp-messenger": "7.2.*", "symfony/apache-pack": "^1.0", "symfony/asset": "7.2.*", "symfony/console": "7.2.*", diff --git a/composer.lock b/composer.lock index bedf23a62..b03f1cf96 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "372291d3dee568b23af143fd02a003c5", + "content-hash": "fa01050c0b2812fd5e5e1ccc5b2abe25", "packages": [ { "name": "agence104/livekit-server-sdk", @@ -5706,6 +5706,79 @@ }, "time": "2023-10-24T06:10:44+00:00" }, + { + "name": "symfony/amqp-messenger", + "version": "v7.2.9", + "source": { + "type": "git", + "url": "https://github.com/symfony/amqp-messenger.git", + "reference": "1b793bb3c9dfa2f7c352ac7b022da7a2c166f647" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/amqp-messenger/zipball/1b793bb3c9dfa2f7c352ac7b022da7a2c166f647", + "reference": "1b793bb3c9dfa2f7c352ac7b022da7a2c166f647", + "shasum": "" + }, + "require": { + "ext-amqp": "*", + "php": ">=8.2", + "symfony/messenger": "^6.4|^7.0" + }, + "require-dev": { + "symfony/event-dispatcher": "^6.4|^7.0", + "symfony/process": "^6.4|^7.0", + "symfony/property-access": "^6.4|^7.0", + "symfony/serializer": "^6.4|^7.0" + }, + "type": "symfony-messenger-bridge", + "autoload": { + "psr-4": { + "Symfony\\Component\\Messenger\\Bridge\\Amqp\\": "" + }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Fabien Potencier", + "email": "fabien@symfony.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + } + ], + "description": "Symfony AMQP extension Messenger Bridge", + "homepage": "https://symfony.com", + "support": { + "source": "https://github.com/symfony/amqp-messenger/tree/v7.2.9" + }, + "funding": [ + { + "url": "https://symfony.com/sponsor", + "type": "custom" + }, + { + "url": "https://github.com/fabpot", + "type": "github" + }, + { + "url": "https://github.com/nicolas-grekas", + "type": "github" + }, + { + "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", + "type": "tidelift" + } + ], + "time": "2025-07-15T11:30:57+00:00" + }, { "name": "symfony/apache-pack", "version": "v1.0.1", @@ -15453,7 +15526,7 @@ ], "aliases": [], "minimum-stability": "stable", - "stability-flags": [], + "stability-flags": {}, "prefer-stable": false, "prefer-lowest": false, "platform": { @@ -15463,6 +15536,6 @@ "ext-openssl": "*", "ext-zip": "*" }, - "platform-dev": [], - "plugin-api-version": "2.3.0" + "platform-dev": {}, + "plugin-api-version": "2.9.0" } diff --git a/config/packages/messenger.yaml b/config/packages/messenger.yaml index b441a185a..553e54edd 100644 --- a/config/packages/messenger.yaml +++ b/config/packages/messenger.yaml @@ -7,8 +7,8 @@ framework: # https://symfony.com/doc/current/messenger.html#transport-configuration async: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' - options: - check_delayed_interval: 1000 +# options: +# check_delayed_interval: 1000 # failed: 'doctrine://default?queue_name=failed' # sync: 'sync://' From ab52b78d577b429ce5275759b9b9e588e16e7b76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20B=C3=B6hne?= Date: Mon, 2 Feb 2026 17:43:27 +0100 Subject: [PATCH 2/2] RV-87: add amqp test command to check amqp/rabbit queue --- .env | 1 + config/packages/messenger.yaml | 7 ++- config/services.yaml | 3 + src/Command/AmqpTestCommand.php | 74 +++++++++++++++++++++++ src/Controller/TestController.php | 26 ++++++++ src/Message/TestMessage.php | 12 ++++ src/MessageHandler/TestMessageHandler.php | 14 +++++ 7 files changed, 135 insertions(+), 2 deletions(-) create mode 100644 src/Command/AmqpTestCommand.php create mode 100644 src/Controller/TestController.php create mode 100644 src/Message/TestMessage.php create mode 100644 src/MessageHandler/TestMessageHandler.php diff --git a/.env b/.env index 8b3c0c1e1..7a00e1067 100644 --- a/.env +++ b/.env @@ -258,6 +258,7 @@ WEBSOCKET_SECRET=DUMMY ###> symfony/messenger ### # Choose one of the transports below +# @TODO change to amqp with rabbitmq credentials MESSENGER_TRANSPORT_DSN=doctrine://default # MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages # MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages diff --git a/config/packages/messenger.yaml b/config/packages/messenger.yaml index 553e54edd..82808b094 100644 --- a/config/packages/messenger.yaml +++ b/config/packages/messenger.yaml @@ -7,8 +7,10 @@ framework: # https://symfony.com/doc/current/messenger.html#transport-configuration async: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' -# options: -# check_delayed_interval: 1000 +# options: +# check_delayed_interval: 1000 + test: + dsn: '%env(MESSENGER_TRANSPORT_DSN)%' # failed: 'doctrine://default?queue_name=failed' # sync: 'sync://' @@ -19,3 +21,4 @@ framework: App\Message\LobbyLeaverMessage: async #App\Message\CustomMailerMessage: async #Symfony\Component\Mailer\Messenger\SendEmailMessage: async + App\Message\TestMessage: test diff --git a/config/services.yaml b/config/services.yaml index e3a39e864..648b46217 100644 --- a/config/services.yaml +++ b/config/services.yaml @@ -199,5 +199,8 @@ services: key: '%env(AWS_KEY)%' secret: '%env(AWS_SECRET_KEY)%' + App\Command\AmqpTestCommand: + arguments: + $testTransport: '@messenger.transport.test' diff --git a/src/Command/AmqpTestCommand.php b/src/Command/AmqpTestCommand.php new file mode 100644 index 000000000..a4215da59 --- /dev/null +++ b/src/Command/AmqpTestCommand.php @@ -0,0 +1,74 @@ +testTransport); + if (!str_contains($transportClass, 'Amqp')) { + $io->error("Transport is not AMQP! Found: $transportClass"); + + return Command::FAILURE; + } + + $randomId = rand(0, PHP_INT_MAX); + $this->messageBus->dispatch( + new TestMessage($randomId), + [new TransportNamesStamp(['test'])] + ); + + // small delay to ensure message is queued + usleep(self::WAIT_MICROSECONDS); + + // try multiple times as there might be other messages stuck in the queue + for ($i = 0; $i <= self::MAX_RETRIES; $i++) { + foreach ($this->testTransport->get() as $envelope) { + $message = $envelope->getMessage(); + + if ($message instanceof TestMessage && $message->randomNumber === $randomId) { + $this->testTransport->ack($envelope); + $io->success("Successfully dispatched and consumed test message!"); + + return Command::SUCCESS; + } + + // reject other test messages that might have been stuck + $this->testTransport->reject($envelope); + } + } + + $io->error("Failed to retrieve test message from queue"); + + return Command::FAILURE; + } +} diff --git a/src/Controller/TestController.php b/src/Controller/TestController.php new file mode 100644 index 000000000..3758d3bae --- /dev/null +++ b/src/Controller/TestController.php @@ -0,0 +1,26 @@ +messageBus->dispatch(new LobbyLeaverMessage('123')); + + return new Response('

allet klar nich wahr

'); + } +} diff --git a/src/Message/TestMessage.php b/src/Message/TestMessage.php new file mode 100644 index 000000000..9d5ab0bc8 --- /dev/null +++ b/src/Message/TestMessage.php @@ -0,0 +1,12 @@ +