Learn how to process sensor data in real time, detect repeating patterns, and trigger alarms to prevent equipment damage and defects
Apache Flink® is a powerful framework that facilitates the seamless processing of massive amounts of data with low latency. Flink is a vital component of many streaming data systems and is also a versatile tool for many use cases, including fraud detection, rule-based alerting, anomaly detection, real-time IoT monitoring, traffic analytics, and more.
This post explores Flink’s capabilities for a real-time IoT monitoring system, where we demonstrate how to process sensor data in real time, detect repeating patterns, and trigger alarms to prevent equipment damage and defects.
But first, let’s start with the basics.
What is Apache Flink?
Flink is a distributed stream processing framework that excels at handling real-time data processing tasks. It offers a unique combination of features, including event-time processing, exactly-once processing guarantees, and efficient state management. It has a wide range of applications in different domains, making it a versatile choice for businesses and developers.
This tutorial demonstrates Flink's role in real-time IoT monitoring. Flink can ingest, process, and analyze sensor data in real time, helping you identify patterns that could lead to equipment damage or defects. It can also help you ensure the health and reliability of manufacturing equipment.
Let’s get started!
Tutorial: building a real-time equipment monitoring system with Redpanda and Flink
Before you begin building a real-time equipment monitoring system with Redpanda and Flink, you should have the following:
- Docker Desktop installed (4.24 or the latest version)
- A running Redpanda Console instance on Docker (23.2.14 or the latest version)
- Apache Flink installed using the version 1.17.1 Docker image
- The Flink Kafka connector JAR file downloaded from this repository
- Python version 3.9 or later
Scenario: real-time IoT equipment monitoring
Before diving into the technical details, let's establish a scenario for the real-time IoT monitoring use case.
Business goal: You're tasked with ensuring the reliability and health of critical equipment in a manufacturing plant. Any unexpected failures can lead to costly downtimes, production delays, and, in some cases, safety concerns.
Objective: Implement a real-time equipment monitoring system that continuously collects data from IoT sensors attached to the equipment. The system must detect patterns that could indicate potential issues, such as overheating, unusual vibrations, or abnormal readings. Early detection will allow proactive maintenance and prevent equipment damage or defects.
Key metrics: The success of this system will be measured by the reduction in unplanned downtime, maintenance cost savings, and the number of equipment-related incidents.
Technical details: The following diagram illustrates the components involved in this real-time equipment monitoring system:
Rough architecture diagram
- IoT Sensors (1..n): These are the physical sensors attached to the manufacturing equipment. They continuously generate data, such as temperature, pressure, vibration, and more. This data is sent to the Redpanda topic for ingestion.
- Redpanda (Topic): Redpanda serves as your high-performance event streaming platform. Each type of sensor data corresponds to a specific topic in Redpanda, allowing you to segregate and process data efficiently.
- Apache Flink: Flink is responsible for processing the incoming data streams from Redpanda. It analyzes the data in real time to detect patterns and anomalies that may indicate equipment issues.
- Alerting System: In this scenario, when Apache Flink detects a pattern that could lead to equipment damage or defects, it triggers an alert. This alert can be sent to an external alerting system that notifies relevant personnel or systems for immediate action. The exact implementation of the alerting system can vary and is out of the scope of this tutorial. The alerting system might include tools like email notifications, SMS alerts, or integration with incident management systems.
1. Create a project directory
Let's get onto the actual implementation. You first need to create a project directory on your machine.
Inside your project directory, create a directory named flink-kafka-connector
and copy the Flink Kafka connector JAR that you downloaded into this directory. You'll need this to allow Flink streaming queries to interact with Redpanda.
2. Start Redpanda and the Flink service
Create a docker-compose.yaml
file in your project directory and paste the corresponding code from the official Redpanda quick start guide. In addition to that, append the below code to define the Flink-related container services:
flink-job-manager: container_name: flink-job-manager image: apache/flink:1.17.1 ports: - "8081:8081" environment: - REDPANDA_HOST=redpanda-0:19092 - | FLINK_PROPERTIES= jobmanager.rpc.address: flink-job-manager # The volume mount suggests that this Flink Job Manager is using a Kafka connector, # which allows Flink to interact with a Redpanda cluster. volumes: - ./flink-kafka-connector/flink-sql-connector-kafka-1.17.1.jar:/opt/flink/lib/flink-sql-connector-kafka-1.17.1.jar command: jobmanager networks: - redpanda_network flink-taskmanager: container_name: flink-taskmanager image: apache/flink:1.17.1 depends_on: - flink-job-manager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: flink-job-manager taskmanager.numberOfTaskSlots: 2 volumes: - ./flink-kafka-connector/flink-sql-connector-kafka-1.17.1.jar:/opt/flink/lib/flink-sql-connector-kafka-1.17.1.jar command: taskmanager networks: - redpanda_network
Next, open a terminal and run this command to start the Redpanda and Flink container services:
docker compose up -d
Once the containers are up and running, you should get the following result when you execute docker ps
in the same terminal:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES b2a7f63e6d35 docker.redpanda.com/redpandadata/console:v2.3.1 "/bin/sh -c 'echo \"$…" 21 seconds ago Up 16 seconds 0.0.0.0:8080->8080/tcp redpanda-console 801c998a2895 apache/flink:1.17.1 "/docker-entrypoint.…" 21 seconds ago Up 17 seconds 6123/tcp, 8081/tcp flink-taskmanager bb23cc15261b docker.redpanda.com/redpandadata/redpanda:v23.2.14 "/entrypoint.sh redp…" 21 seconds ago Up 17 seconds 8081-8082/tcp, 0.0.0.0:18081-18082->18081-18082/tcp, 9092 /tcp, 0.0.0.0:19092->19092/tcp, 0.0.0.0:19644->9644/tcp redpanda-0 e526af3b857b apache/flink:1.17.1 "/docker-entrypoint.…" 21 seconds ago Up 19 seconds 6123/tcp, 0.0.0.0:8081->8081/tcp flink-job-manager
3. Create topics on Redpanda
You need to create two topics in your Redpanda cluster:
temperature_sensor
to receive the events from the IoT simulator application that you'll create shortlynotification_events
to register the notification/alerting events as a result of the Flink streaming app transformation process
To start, open a browser and visit http://localhost:8080/ to access the Redpanda Console application:
Here, you can see an overview and the health status of the Redpanda cluster. To create topics in the cluster, click Topics in the side menu and click Create Topic:
Fill in the topic name in the on-screen prompt and click Create:
Repeat the same steps to create the notification_events
topic. Once done, both topics should be listed under the "Create Topic" button:
Alternatively, you can do all the above steps programmatically with rpk
, a CLI tool for interacting with a Redpanda cluster.
4. Create the IoT sensor data simulator app
Now that your topics are ready, let's go ahead and develop a Python app that simulates the IoT sensor data. This simulator app will send the temperature data to your first Redpanda topic, temperature_sensor
.
Create a requirements.txt
file in the project directory and paste in the code below:
kafka-python==2.0.2
The kafka-python
library allows the Python app to interact with the Redpanda cluster. You can prepare the virtual environment required for this Python app by running the following commands in a command line terminal (keep this terminal open for later use):
python -m venv venv venv\Scripts\activate pip install -r requirements.txt
Next, create a Python package named iot-sensor-simulator
using your preferred IDE. Inside that package, create a Python module called produce-sensor-data.py
and paste in the code below:
from kafka import KafkaProducer import random import time import json # Define the Redpanda topic and Kafka producer topic = "temperature_sensor" producer = KafkaProducer(bootstrap_servers='localhost:19092', value_serializer=lambda v: json.dumps(v).encode('utf-8')) # Simulate temperature readings while True: # Generate a random temperature reading between 0°C and 100°C temperature = random.uniform(0, 100) # Create a JSON payload with a timestamp and temperature value data = { "timestamp": int(time.time()), "temperature": temperature } # Send the data to the Redpanda topic producer.send(topic, value=data) print(f"Sent data: {data}") # Simulate data every 5 seconds (adjust the interval as needed) time.sleep(5)
5. Consume sensor data using a Flink streaming query
Your Python simulator app is now ready to produce temperature sensor data to the topic, so you can move on to create a consumer using Flink.
On Flink, you'll run a streaming query to consume data from the temperature_sensor
topic and detect patterns. In this case, any temperature reading above 60°C will trigger a notification event to your second topic, notification_events
.
Open a new terminal and connect to the Flink job manager's container terminal by executing the following command:
docker exec -it flink-job-manager bash
You'll use the Flink SQL Client, a CLI-based application, to submit table programs or Flink jobs to a Flink cluster. This allows you to interact with Flink's Table and SQL APIs. This tool will stream the data from the temperature_sensor
topic and then produce the alarming events to the notification_events
topic. Connect to the sql-client.sh
CLI interface by executing the following command in the Flink job manager's container terminal:
sql-client.sh embedded
You should see an output similar to the one below:
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /root/.flink-sql-history Flink SQL>
Next, execute the following command in the above Flink SQL prompt to define a table named temperature_sensor
to facilitate the data streaming from the temperature_sensor
topic:
CREATE TABLE temperature_sensor ( temperature DOUBLE, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'temperature_sensor', 'properties.bootstrap.servers' = 'redpanda-0:9092', 'properties.group.id' = 'cg_1', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' );
The table has the following specifications:
- This code contains two fields:
temperature
is a field of typeDOUBLE
to store temperature readings.ts
is a field of typeTIMESTAMP(3)
to store timestamps associated with each temperature reading.
- The
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
line defines a watermark strategy for event-time processing. It's saying that any event with a timestamp (ts
) that is more than five seconds older than the maximum timestamp seen so far is considered late. - The
WITH
clause specifies properties for connecting to a Kafka topic:'connector' = 'kafka'
specifies that the connector type is Kafka.'topic' = 'temperature_sensor'
specifies the Kafka topic name.'properties.bootstrap.servers' = 'redpanda-0:9092'
specifies the address of the Kafka bootstrap server.'properties.group.id' = 'cg_1'
specifies the consumer group ID for the Kafka consumer.'scan.startup.mode' = 'earliest-offset'
specifies that the Kafka consumer should start consuming from the earliest offset in the topic.
In summary, this command defines a table in Flink that reads data from a Kafka topic named temperature_sensor
. The data is expected to be in JSON format and contains temperature readings along with their associated timestamps.
Similarly, define another table named notification_events
to produce the alerting or notification events to your target topic:
CREATE TABLE notification_events ( temperature DOUBLE ) WITH ( 'connector' = 'kafka', 'topic' = 'notification_events', 'properties.bootstrap.servers' = 'redpanda-0:9092', 'format' = 'json' );
Execute the INSERT
statement below to extract the temperature readings that exceed the value of 60
and produce the identified pattern of records as events into the target notification_events
topic:
INSERT INTO notification_events SELECT temperature FROM temperature_sensor WHERE temperature > 60;
You should see an output similar to the one below:
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 3549cc0def14134606bcebcff8fbb7ec
6. Run the IoT sensor data simulator
The Flink job has now been submitted to consume the incoming temperature sensor readings, and you can run the Python app that you created earlier for generating temperature readings. Switch to the command line terminal, which should be still active in your Python app's virtual environment, and execute this command:
python iot-sensor-simulator\produce-sensor-data.py
You should start seeing an output generating temperature readings one by one:
Sent data: {'timestamp': 1699105319, 'temperature': 53.57524954563365} Sent data: {'timestamp': 1699105324, 'temperature': 12.633941164102346} Sent data: {'timestamp': 1699105329, 'temperature': 25.797944478255065} Sent data: {'timestamp': 1699105334, 'temperature': 59.97157998845765} Sent data: {'timestamp': 1699105339, 'temperature': 97.28872564878496} Sent data: {'timestamp': 1699105344, 'temperature': 64.17179488745744} Sent data: {'timestamp': 1699105349, 'temperature': 75.17949439574046} Sent data: {'timestamp': 1699105354, 'temperature': 52.73660062477933} Sent data: {'timestamp': 1699105359, 'temperature': 94.62147761948029} …
The simulator app will send these events to the temperature_sensor
topic.
7. View the events generated in Redpanda topics
Let's view the events created in both topics. Access the Redpanda Console UI screen again. If you go to the same Topics screen as before, you should see that both of the topics are populated with data. Notice that the sizes of these topics have increased because the simulator app is pushing new messages into them:
Click the temperature_sensor
topic to view the data. Notice that this topic contains all the temperature readings that the simulator app has produced:
Similarly, access the notification_events
topic to view its data. This topic contains only the temperature readings that exceed a value of 60
that are produced by the Flink streaming query:
Conclusion
In this tutorial, you learned how Flink can be used with Redpanda to build a real-time IoT monitoring system. You can use this knowledge to apply the power of Flink and Redpanda for real-time data processing and analysis, which opens up possibilities for various applications such as fraud detection, rule-based alerting, anomaly detection, and more.
With this foundation, you're well on your way to exploring more complex use cases and harnessing the full potential of these powerful tools!
The full code from this tutorial is available in this GitHub repository. To keep exploring Redpanda, check the documentation and browse the Redpanda blog for more tutorials. If you have any questions, ask away on the Redpanda Community on Slack.
Let's keep in touch
Subscribe and never miss another blog post, announcement, or community event. We hate spam and will never sell your contact information.