Using SingleStore Pipelines With Kafka, Part 3

Summary

This article is the third and final part of our pipeline series. We’ll be looking at replacing the consumer portion of the Producer-Consumer app with a compelling feature of the SingleStore, called pipelines.

The SQL scripts, Java code, and notebook files used in this article series are available on GitHub. Notebook files are available in DBC, HTML, and iPython formats.

an introduction

This is a three-part essay series, which is organized as follows:

  1. Upload sensor data into SingleStore.
  2. Demonstration of product-consumer using Java and JDBC.
  3. Show SingleStore pipelines.

This third article covers Part 3, Showing SingleStore Pipelines.

SingleStore Pipelines

Pipelines allow us to create streaming ingest feeds from different sources, such as Apache Kafka™, Amazon S3, and HDFS, using a single command. Using pipelines, we can perform ETL operations:

  1. extract. Pull data from different sources without the need for additional middleware.
  2. Transformation. Set and enrich data using transformations.
  3. Pregnancy. Ensure delivery of messages and eliminate duplications.

Visually, Figure 1 shows the architecture of our SingleStore pipelines.

Figure 1. Single Store and Kafka Using Pipelines.

For our use case, we can create a simple pipeline in SingleStore as follows:

USE sensor_readings;

CREATE PIPELINE IF NOT EXISTS kafka_confluent_cloud AS
LOAD DATA KAFKA '{{ BROKER_ENDPOINT }}/temp'
CONFIG '{
   "security.protocol" : "SASL_SSL",
   "sasl.mechanism" : "PLAIN",
   "sasl.username" : "{{ CLUSTER_API_KEY }}"}'
CREDENTIALS '{
   "sasl.password" : "{{ CLUSTER_API_SECRET }}"}'
SKIP DUPLICATE KEY ERRORS
INTO TABLE temperatures
FORMAT CSV
FIELDS TERMINATED BY ',';

We need to add values ​​for {{ BROKER_ENDPOINT }} And {{ CLUSTER_API_KEY }} And {{ CLUSTER_API_SECRET }}. We covered how to find them in the previous article in this series.

We define the temporary The topic name is in line 2 of the pipeline code. We also specify that we are using the CSV format and that the comma character separates the fields.

Next, we need to start the pipeline. This can be done as follows:

START PIPELINE kafka_confluent_cloud;

We can check the pipeline using:

The pipeline we defined and initiated will ingest message data directly from Confluent Cloud to our SingleStore database. As we can see, the architecture is simplified compared to the approach using JDBC.

Example queries

Now that we’ve built our system, we can start asking queries, such as looking for sensors where the temperature reading is within a certain range:

USE sensor_readings;

SELECT sensorid, COUNT(*)
FROM temperatures
WHERE temp > 70 AND temp < 100
GROUP BY sensorid
ORDER BY sensorid;

or within the given latitude and longitude coordinates:

USE sensor_readings;

SELECT MAX(temp) AS max_temp, sensorid
FROM temperatures AS t
JOIN sensors AS s ON t.sensorid = s.id
WHERE s.latitude >= 24.7433195 AND s.latitude <= 49.3457868 AND
      s.longitude >= -124.7844079 AND s.longitude <= -66.9513812
GROUP BY sensorid
ORDER BY max_temp DESC;

We can also use SingleStore’s geospatial features to find the land masses where the sensors are located:

USE sensor_readings;

SELECT continents.name AS continent, sensors.name AS sensor_name
FROM continents
JOIN sensors
ON GEOGRAPHY_CONTAINS(continents.geo, sensors.location)
ORDER BY continents.name, sensors.name;

Abstract

Pipelines are a compelling feature of SingleStore. We only implemented a small example but immediately recognized the benefits of a simplified architecture. The main benefits of pipelines include:

  • Fast parallel loading of data into a database.
  • Eliminate the direct redundancy of real-time data purification.
  • Streamlined design eliminates the need for additional middleware.
  • Extensible plug-in framework that allows customizations.
  • Semantics exactly once, essential to enterprise data.

More details about pipelines can be found on the SingleStore Documentation website.

.

Leave a Comment