Companies want to react quickly to the need of processing and sharing large amounts of data in real time to gain insights and create more engaging customer experiences. So, traditional data processing is no longer viable in today’s world.
To achieve that, you need to process a lot of data as fast as possible and then send it to other services for more processing. But in the middle of all these quick actions, it's necessary to notify consumers when the event occurs—and we can do just that using event streaming.
This is the repo in GitHub that we will be using.
Events
Before talking about event streaming, let’s talk about what an event is. An event that happens within an application can be related to a user process or simply actions that affect the business.
Events represent a state change, not the question of how to modify the application. Consider these as examples:
- A user logging into a service
- A payment transaction
- A writer publishing a post in a blog
In most cases, an event will trigger more events; for example, when a user signs up for a service, the app sends a notification to their device, inserts the record in the database, and sends a welcoming email.
Event streaming
Event Streaming is a pattern for capturing data in real time from event sources such as databases. The main parts of event streaming are as follows:
- Broker: The system in charge of storing events
- Topic: A category of events
- Producer: Sends events to a broker on a specific topic
- Consumer: Reads the events
- Events: Data that producers want to communicate to consumers
It is inevitable to talk about publish and subscribe architecture pattern (pub/sub pattern) at this point; event streaming is an implementation of that pattern but with these changes:
- Events occur instead of messages.
- Events are ordered, typically by time.
- Consumers can read events from a specific point in the topic.
- The events have temporal durability.
The flow starts when the producer publishes a new event into a topic (as we saw previously, the topic is just the categorization for a specific type of event). Then, consumers interested in events of a particular category subscribe to that topic. Finally, the broker identifies the consumers of the topic and makes the desired events available.
Advantages of event streaming
Decoupling There's no dependency between publishers and consumers because they don't need to know each other. In addition, the events don't specify their actions, so many consumers could get the same event and perform different actions.
Low Latency Events are decoupled and let the consumer utilize them anytime; it can happen in milliseconds.
Independence As we know, publishers and consumers are independent, so different teams can work with them using the same events for other actions or purposes.
Fault Tolerance Some event streaming platforms help you deal with consumer failures; for example, consumers can save their position and start from there again if an error occurs.
Real-Time Handling Feedback is received in real time, so the users don't need to wait minutes or hours to see the response of their events.
High Performance Event platforms can handle many messages due to the low latency—for example, thousands of events in a second.
Disadvantages of event streaming
Monitoring Some event streaming tools don't have a complete monitoring tool; they call for additional tools to be implemented, such as Datadog or New Relic.
Configuration The configuration in some tools can be overwhelming even for experienced people. There are many parameters, and sometimes, you need to know in depth about the subject to implement them.
Client libraries It isn't easy to implement Kafka in languages other than Java. Sometimes, the client libraries are not up to date, show instability, or don't offer many alternatives to choose from.
One of the most popular tools for event streaming is Apache Kafka. This tool allows users to send, store, and request data whenever and wherever they need it; let's talk about it.
Apache Kafka
"Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications."
Being specifically designed for real-time log transmission, Apache Kafka is ideal for applications that require the following:
- Reliable data exchange between different components
- The ability to divide messaging workloads as application requirements change
- Real-time transmission for data processing
Let's use Kafka in a Rails application!
Using Kafka with Rails
The most famous gem to use Kafka in Ruby is called ruby-kafka by Zendesk, and it is great! Still, you need to do all the implementation manually, which is why we have some "frameworks" built with ruby-kafka. They also help us with all the configuration and execution steps.
Karafka is a framework used to simplify Apache Kafka-based Ruby applications development.
To work with Kafka, it is necessary to install Java. Because Kafka is a Scala and Java application also, installing Zookeeper will be required.
Before the installation, I want to explain a bit about Zookeeper. Zookeeper is a centralized service essential for Kafka; it sends notifications in case of changes such as the creation of a new topic, crash of a broker, removal of a broker, deletion of topics, and so on.
Its main task is to manage Kafka brokers, maintain a list with their respective metadata, and facilitate health-checking mechanisms. In addition, it helps to select the leading broker for different partitions of the topics.
Requirements
For MacOS:
Now, let's install Java and Zookeeper with the following commands:
brew install java
brew install zookeeper
Then, we can continue installing Kafka running this:
brew install kafka
Once we have Kafka and Zookeeper installed, it's necessary to start the services this way:
brew services start zookeeper
brew services start kafka
For Windows and Linux:
Instructions:
Setting Up Rails
Just create a simple Rails application as usual:
rails new karafka_example
and add the karafka gem within the Gemfile:
gem 'karafka'
Then, run bundle install
to install the gem recently added, and don't forget to run the following command to get all the Karafka things:
bundle exec karafka install
That command should generate some interesting files: the first one is karafka.rb
in the root directory, app/consumers/application_consumer.rb
, and app/responders/application_responder.rb
.
Karafka Initializer
The karafka.rb
file is like an initializer application separated from Rails config. It allows you to configure the Karafka application and draw some routes, similar in terms of API as Rails application routes. But here, it’s for topics and consumers.
Producer
The producer is in charge of creating the events, and we can add them into the app/responders
folder. Now, let’s make a simple producer for users:
# app/responders/users_responder.rb
class UsersResponder < ApplicationResponder
topic :users
def respond(event_payload)
respond_to :users, event_payload
end
end
Consumer
The consumer is responsible for reading all the events/messages sent from the producer. This is just a consumer that logs the received message.
# app/consumers/users_consumer.rb
class UsersConsumer < ApplicationConsumer
def consume
Karafka.logger.info "New [User] event: #{params}"
end
end
We use params
to get the event. But if you'll read events in batches and you have the config config.batch_fetching
as true, you should use params_batch
.
Testing
To run our Karafka service (the one that will be hearing the events), go to the console, open a new tab, go to the Rails project, and run:
bundle exec karafka server
Successful Event
Now, open another console tab, go to the Rails project, and type this:
rails c
There, let’s create an event with our responder:
> UsersResponder.call({ event_name: "user_created", payload: { user_id: 1 } })
If you check the Rails console, we will receive this message after the event is created:
Successfully appended 1 messages to users/0 on 192.168.1.77:9092 (node_id=0)
=> {"users"=>[["{\"event_name\":\"user_created\",\"payload\":{\"user_id\":1}}", {:topic=>"users"}]]}
And in the Karafka service tab, you’ll see something like this:
New [User] event: #<Karafka::Params::Params:0x00007fa76f0316c8>
Inline processing of topic users with 1 messages took 0 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:1 as processed
[[karafka_example] {}:] Committing offsets: users/0:2
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 28 to 192.168.1.77:9092
But if you just want the message payload, you can add params.payload
in your consumer and you will have something like this:
Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "payload"=>{"user_id"=>1}}
Inline processing of topic users with 1 messages took 1 ms
1 message on users topic delegated to UsersConsumer
Failed Event
You can create a User model with some attributes like email
, first_name
and last_name
running the following command:
rails g model User email first_name last_name
Then, you can run the migration with this:
rails db:migrate
Now, add some validations like this:
class User < ApplicationRecord
validates :email, uniqueness: true
end
Finally, we can change the consumer:
class UsersConsumer < ApplicationConsumer
def consume
Karafka.logger.info "New [User] event: #{params.payload}"
User.create!(params.payload['user'])
end
end
So, let's create two events with the same email:
UsersResponder.call({ event_name: "user_created", user: { user_id: 1, email: 'batman@mail.com', first_name: 'Bruce', last_name: 'Wayne' } } )
UsersResponder.call({ event_name: "user_created", user: { user_id: 2, email: 'batman@mail.com', first_name: 'Bruce', last_name: 'Wayne' } } )
With this, the first event is created in the database:
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>1, "email"=>"batman@mail.com", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
TRANSACTION (0.1ms) BEGIN
↳ app/consumers/users_consumer.rb:14:in `consume'
User Create (9.6ms) INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id" [["user_id", "1"], ["email", "batman@mail.com"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:29:14.827778"], ["updated_at", "2021-03-10 04:29:14.827778"]]
↳ app/consumers/users_consumer.rb:14:in `consume'
TRANSACTION (5.0ms) COMMIT
↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 70 ms
1 message on users topic delegated to UsersConsumer
But the second one will fail, because we have a validation that says the email is unique. If you try to add another record with an existing email, you will see something like this:
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"batman@mail.com", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
TRANSACTION (0.2ms) BEGIN
↳ app/consumers/users_consumer.rb:14:in `consume'
User Exists? (0.3ms) SELECT 1 AS one FROM "users" WHERE "users"."email" = $1 LIMIT $2 [["email", "batman@mail.com"], ["LIMIT", 1]]
↳ app/consumers/users_consumer.rb:14:in `consume'
TRANSACTION (0.2ms) ROLLBACK
↳ app/consumers/users_consumer.rb:14:in `consume'
[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42 -- ActiveRecord::RecordInvalid: Validation failed: Email has already been taken
You can see the error in the last line ActiveRecord::RecordInvalid: Validation failed: Email has already been taken
. But the interesting thing here is that Kafka will try to process the event, again and again. Even if you restart the Karafka server, it will try to process the last event. How does Kafka know where to start?
If you see your console, after the error, you will see this:
[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42
It will tell you which offset was processed: in this case, it was offset 42. So, if you restart the Karafka service, it will start in that offset.
[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {users: 0}:] Fetching batches
It will still fail because we have the email validation in our User model. At this point, stop the Karafka server, remove or comment that validation, and start your server again; you’ll see how the event is processed successfully:
[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 5 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 5 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 5 from 192.168.1.77:9092
Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"batman@mail.com", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
TRANSACTION (0.2ms) BEGIN
↳ app/consumers/users_consumer.rb:14:in `consume'
User Create (3.8ms) INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id" [["user_id", "2"], ["email", "batman@mail.com"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:49:37.832452"], ["updated_at", "2021-03-10 04:49:37.832452"]]
↳ app/consumers/users_consumer.rb:14:in `consume'
TRANSACTION (5.5ms) COMMIT
↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 69 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:43 as processed
Finally, you can see this message in the last line: Marking users/0:43 as processed
.
Callbacks
This is something cool that Karafka offers: you can use callbacks in your Consumer. To do that, you only need to import the module and use them. Then, open your UserConsumer
and add this:
class UsersConsumer < ApplicationConsumer
include Karafka::Consumers::Callbacks
before_poll do
Karafka.logger.info "*** Checking something new for #{topic.name}"
end
after_poll do
Karafka.logger.info '*** We just checked for new messages!'
end
def consume
Karafka.logger.info "New [User] event: #{params.payload}"
User.create!(params.payload['user'])
end
end
Poll is the medium through which we fetch records based on the current partition offset. So, those callbacks before_poll
and after_poll
, like their name suggests, are executed at that moment. We are just logging a message, and you can see them in your Karafka server—one before fetching and the other one after that:
*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 325 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 326 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 326 from 192.168.1.77:9092
*** We just checked for new messages!
Heartbeats
A heartbeat is just the way we, as consumers, say to Kafka we are alive; otherwise, Kafka will assume that the consumer is dead.
In Karafka, we have a default config to do this in a period of time; it is kafka.heartbeat_interval
and the default is 10 seconds. You can see this heartbeat in your Karafka server.
*** Checking something new for users
[[karafka_example_example] {}:] Sending heartbeat...
[[karafka_example_example] {}:] [heartbeat] Sending heartbeat API request 72 to 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Waiting for response 72 from 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Received response 72 from 192.168.1.77:9092
*** We just checked for new messages!
With Sending heartbeat...
, Kafka knows that we are alive and we are a valid member of its consumer group. Also, we can consume more records.
Commit
Marking an offset as consumed is called committing an offset. In Kafka, we record offset commits by writing to an internal Kafka topic called the offsets topic. A message is considered consumed only when its offset is committed to the offsets topic.
Karafka has a config to carry out this commit automatically each time; the config is kafka.offset_commit_interval
, and its value is 10 seconds by default. With this, Karakfa will do an offset commit every 10 seconds, and you can view that message in your Karafka server:
*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 307 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 308 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 308 from 192.168.1.77:9092
[[karafka_example] {}:] Committing offsets: users/0:44
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 69 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 69 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 69 from 192.168.1.77:9092
*** We just checked for new messages!
The Committing offsets: users/0:44
tell us which offset it’s committing; in my case, it told Kafka that it can commit the offset number 44 from topic 0. In this way, if something happens with our service, Karafka can start again to process events from that offset.
Conclusion
Event streaming helps us to be faster, to make better use of data, and to design better user experiences. As a matter of fact, many companies are using this pattern to communicate all their services and to be able to react to different events in real time. As I mentioned before, there are other alternatives apart from Karafka that you can use with Rails. You already have the basics; now, feel free to experiment with them.