A long time symfony enthusiast from the early days
What's an object ?
In programming, asynchronous events are those occurring independently of the main program flow. Asynchronous actions are actions executed in a non-blocking scheme, allowing the main program flow to continue processing http://msdn2.microsoft.com/en-us/library/7ch3stsw.aspx
Architecture scale
PHP cannot fork or use thread
Boss: A client need a web service to generate a PDF from any url.
Awesome Guy: OK, no problem, I can use wkhtmltopdf to render any url, it is one hour job!
Boss: Awesome!
<?php public function generateAction(Request $request, $url) { $tmpfname = sprintf("%s/screeny_output_%s", sys_get_temp_dir(), uniqid(sha1($url), true)); $this->get('knp_snappy.pdf')->generate($url, $tmpfname); $response = new Response(file_get_contents($tmpfname), 200, array( 'Content-Type' => 'application/pdf', 'Content-Disposition' => sprintf('attachment; filename="%s.pdf"', Inflector::tableize($url)) )); unlink($tmpfname); return $response; }
Boss: We need to find a solution => Use a cache !!!
Awesome Guy: OK!
<?php public function generateAction(Request $request, $url) { $uniqid = sha1($url); $manager = $this->get('sonata.media.manager.media'); $media = $manager->findOneBy(array('uniqid' => $uniqid)); if (!$media) { $tmpfname = sprintf("%s/screeny_output_%s", sys_get_temp_dir(), uniqid($uniqid, true)); $this->get('knp_snappy.pdf')->generate($url, $tmpfname); $media = $manager->create(); $media->setUniqid($uniqid); $manager->save($media, 'screeny', 'sonata.media.provider.file'); } return $this->get('sonata.media.pool') ->getProvider('sonata.media.provider.file') ->getDownloadResponse($media, 'reference', 'http', array( 'Content-Disposition' => sprintf('attachment; filename="%s"', Inflector::tableize($media->getName()).'.pdf'), )); }
Boss: argh!!! We need to buy more servers!
Awesome Guy: maybe we can try another strategy using asynchronous messages
Boss: huuuu ! what?!
Pattern used in the SonataNotificationBundle
# install SonataNotificationBundle composer require sonata-project/notification-bundle # edit config.yml sonata_notification: backend: sonata.notification.backend.rabbitmq backends: rabbitmq: exchange: router queue: messages connection: host: 127.0.0.1 user: guest pass: guest port: 5672 vhost: /screening_master # install rabbitmq apt-get install rabbitmq-server
<?php class UrlShotManager { public function shot($url) { $uniqid = sha1($url); $media = $this->mediaManager->findOneBy(array('uniqid' => $uniqid)); if (!$media) { $tmpfname = sprintf("%s/screeny_output_%s", sys_get_temp_dir(), uniqid($uniqid, true)); $this->generator->generate($url, $tmpfname); $this->mediaManager->save($media, 'screeny', 'sonata.media.provider.file'); } return $media; } }
<?php class UrlShotConsumer implements ConsumerInterface { public function process(ConsumerEvent $event) { $url = $event->getMessage()->getValue('url'); if (!$url) { return; } try { $this->manager->shot($url); } catch(\Exception $e) { } } }
<?php public function generateAction(Request $request, $url) { $this->get('sonata.notification.backend') ->createAndPublish('url_shot', array( 'url' => $url )); return new JsonResponse(array('status' => 'OK')); }
What if the logic becomes more complex:
UrlShotConsumer
<?php class UrlShotConsumer implements ConsumerInterface { public function process(ConsumerEvent $event) { $url = $event->getMessage()->getValue('url'); if (!$url) { return; } try { $media = $this->manager->shot($url); $this->logger->info(sprintf("OK - The PDF has been generated - media.id = %d - $url", $media->getId(), $url)); $this->mailer->send("[email protected]", "The PDF has been generated", "message"); $this->metric->inc('pdf.generated.success') } catch(\Exception $e) { $this->logger->error(sprintf("ERROR - The PDF has not been generated - media.id = %d - $url", $media->getId(), $url)); $this->mailer->send("[email protected]", "The PDF has not been generated", "message"); $this->metric->inc('pdf.generated.error') } } }
... is open source message broker software (i.e., message-oriented middleware) that implements the Advanced Message Queuing Protocol (AMQP) standard.
The RabbitMQ server is written in Erlang and is built on the Open Telecom Platform framework for clustering and failover
Exchange types: direct, fanout and topic
note: for more information, please review http://www.rabbitmq.com/tutorials/amqp-concepts.html
Integrates more RabbitMQ features inside Symfony2,
maintained by Alvaro Videla
- Co author of RabbitMQ in Action and
co author of php-amqplib -
old_sound_rabbit_mq: producers: call_shot: connection: default exchange_options: {name: 'shot_direct', type: direct} consumers: generate_shot: connection: default exchange_options: {name: 'shot_direct', type: direct} queue_options: {name: 'generate_shot'} callback: sonata_screeny.old_sound_rabbit_mq.generate_shot generate_shot_log: connection: default exchange_options: {name: 'shot_direct', type: direct} queue_options: {name: 'generate_shot_log'} callback: sonata_screeny.old_sound_rabbit_mq.generate_shot_log generate_shot_mail: connection: default exchange_options: {name: 'shot_direct', type: direct} queue_options: {name: 'generate_shot_mail'} callback: sonata_screeny.old_sound_rabbit_mq.generate_shot_mail
# Start the PDF consumer app/console rabbitmq:consumer generate_shot --route=shot_direct.start # Start the log consumer app/console rabbitmq:consumer generate_shot_log --route=shot_direct.start # Start the confirmation email consumer app/console rabbitmq:consumer generate_shot_mail --route=shot_direct.completed
<?php $con = new AMQPConnection('localhost', 5672, 'guest', 'guest', '/test_payload'); $ch = $connection->channel(); $ch->queue_declare('super_bus', false, true, false, false); $ch->exchange_declare('starting_point', 'direct', false, true, false); $ch->queue_bind('super_bus', 'starting_point'); while (1) { $body = json_encode(array( 'command' => 'foobar', 'sleep' => 1200, 'body' => str_repeat(uniqid(), rand(1, 500)), )); $amq = new AMQPMessage($body, array( 'content_type' => 'text/plain', 'delivery-mode' => 2 )); $channel->basic_publish($amq, 'starting_point'); }
<?php $conn = new AMQPConnection('localhost', 5672, 'guest', 'guest', '/test_payload'); $ch = $conn->channel(); $ch->queue_declare('super_bus', false, true, false, false); $ch->exchange_declare('starting_point', 'direct', false, true, false); $ch->queue_bind('super_bus', 'starting_point'); $ch->basic_consume('super_bus', 'pong_command', false, false, false, false, function(AMQPMessage $message) { echo "Payload size" + strlen($message->body) + "\n"; $message ->delivery_info['channel'] ->basic_ack($message->delivery_info['delivery_tag']); }); // Loop as long as the channel has callbacks registered while (count($chan->callbacks)) { $channel->wait(); }