Kategoriler
PHP RabbitMQ

RabbitMQ (PHP ile)

PHP ile RabbitMQ’nun kullanımına geçmeden önce RabbitMQ nedir? sorusuna kısaca cevap vereyim. RabbitMQ, AMQP (Advanced Message Queuing Protocol) protokolü üzerine inşa edilmiş açık kaynaklı bir mesaj kuyruğu (message queue) yazılımıdır. Erlang programlama dili ile yazılmıştır. Temel olarak, gönderen (producer) ve alıcı (consumer) uygulamalar arasındaki iletişimi kolaylaştırarak iş yükünü dengeler.

PHP ile RabbitMQ kullanımına geçmeden önce bize bir RabbitMQ servisi gerekiyor.

RabbitMQ’yu Docker Container Olarak Kurmak

Elbette bu işlem için bilgisayarınızda Docker kurulu olmalıdır. Terminal/Komut İstemcinizde aşağıdaki komutu yürüterek Docker’ın kurulu olup olmadığını test edebilirsiniz. Kurulu değilse docker.com sitesinden indirip kurabilirsiniz.

docker --version

Ardından aşağıdaki komut ile docker containerı olarak rabbitmq servisimizi kurup ayağa kaldırabiliriz.

docker run -d --name test_rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:latest

Yukarıdaki komut bizim için test_rabbitmq adında bir docker container oluşturacak ve RabbitMQ son sürümünü bizim için kuracaktır. 5672 portundan AMQP, 15672 (http://localhost:15672) portundan HTTP üzerinden yönetim ekranına erişebilirsiniz.

PHP ile RabbitMQ İletişimi

PHP ile RabbitMQ iletişimi sağlamak için ekstra bir php paketine ihtiyacımız var. Öncelikle php-amqplib/php-amqplib paketini composer yardımı ile projemize kurmamız gerekiyor.

composer require php-amqplib/php-amqplib

Not: Composer‘ın cihazınızda kurulu olması gerektiğini unutmayın. Ayrıca PHP’nin MB_String ve BCMath paketlerinin de kurulu ve aktif olması gerektiğini unutmayın.

Evet her şey hazırsa öncelikle gönderen (producer) ya da bir başka ifade ile publisherımızı PHP ile programlayalım.

producer.php

<?php
require_once(__DIR__ . '/vendor/autoload.php');

const RABBITMQ_HOST = "localhost";
const RABBITMQ_PORT = 5672;
const RABBITMQ_USERNAME = "guest";
const RABBITMQ_PASSWORD = "guest";
const RABBITMQ_QUEUE = "taskQueue";

$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD);

$channel = $connection->channel();

// Kuyruk yoksa oluştur!
$channel->queue_declare(RABBITMQ_QUEUE, false, true, false, false, false, null, null);

$jobId = 0;

while (true) {
    // İş Mesajının Metnini Oluşturalım
    $jobData = [
        "id"        => ++$jobId,
        "task"      => "sleep",
        "period"    => rand(0, 3),
    ];
    $job = json_encode($jobData, JSON_UNESCAPED_SLASHES);

    $message = new PhpAmqpLib\Message\AMQPMessage($job,  [
        'delivery_mode'     => 2, // Mesajı kalıcı yap
    ]);
    
    $channel->basic_publish($message, '', RABBITMQ_QUEUE);
    
    echo '[' . date("Y-m-d H:i:s") . '] Job created!' . PHP_EOL;
    sleep(1);
}

Yukarıdaki producer (gönderici) 1 saniyede bir yeni bir mesaj yani işi RabbitMQ kuyruğumuza gönderir. Producer’ı çalıştırmak için;

php producer.php

komutunu kullanabilirsiniz.

consumer.php

<?php
require_once(__DIR__ . '/vendor/autoload.php');

const RABBITMQ_HOST = "localhost";
const RABBITMQ_PORT = 5672;
const RABBITMQ_USERNAME = "guest";
const RABBITMQ_PASSWORD = "guest";
const RABBITMQ_QUEUE = "taskQueue";

$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD);

$channel = $connection->channel();

// Kuyruk yoksa oluştur!
$channel->queue_declare(RABBITMQ_QUEUE, false, true, false, false, false, null, null);


echo 'Mesajlar bekleniyor. Çıkmak için CTRL+C' . PHP_EOL;

$worker = function (object $message) {
    try {
        $job = json_decode($message->body, true);
        echo '[-] İşleniyor... #' . $job['id'] . PHP_EOL;
        sleep($job['period']);
        echo '[+] Bitti #' . $job['id'] . PHP_EOL;

        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    } catch (\Throwable $e) {
        echo '[x] Failed #' . $job['id'] . PHP_EOL;
        $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
    }
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume(RABBITMQ_QUEUE, '', false, false, false, false, $worker);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

Yukarıdaki consumer.php dosyası da mesaj kuyruğunu dinleyerek yeni bir mesaj geldiğinde işleme alacaktır. Çalıştırmak için;

php consumer.php

komutunu yürütün.

Hepsi bu kadar! 🙂

Yazar Muhammet ŞAFAK

Bilgisayar Programcısı ve Back-end Yazılım Geliştirici

Bir Cevap Yazın

Muhammet ŞAFAK sitesinden daha fazla şey keşfedin

Okumaya devam etmek ve tüm arşive erişim kazanmak için hemen abone olun.

Okumaya Devam Edin