I finally took the time to test Apache Pinot
I’ve been wanting to test Apache Pinot for a very long time and I finally took the time to do it!
First, a quick description of Pinot
Pinot is a real-time distributed OLAP datastore, purpose-built to provide ultra low-latency analytics, even at extremely high throughput. It can ingest directly from streaming data sources or batch data sources.
At the heart of the system is a columnar store, with several smart indexing and pre-aggregation techniques for low latency.
Pinot was built by engineers at LinkedIn and Uber and is designed to scale up and out with no upper bound. Performance always remains constant based on the size of your cluster and an expected query per second (QPS) threshold.
A Pinot cluster consists of the following elements:
- Pinot Controller: composed of Apache Helix (cluster management) and Apache Zookeeper (coordination), it is the central component of Pinot that will take care of cluster orchestration, replication, and state management of the various cluster components.
- Pinot Broker: receives client requests and routes their executions to one or more Pinot servers before returning a consolidated response.
- Pinot Server : stores segments (part of a table) and executes queries. A server can be either real-time (in case of streaming data) or offline (in case of batched and immutable data).
- Pinot Minion : optional component allowing to run background tasks within the cluster, for example for data purging.
First launch
So let’s start at the beginning: launching Pinot locally! Apache Pinot being a distributed system with several components (Zookeeper, Pinot Controller, Pinot Broker, Pinot Server), I decided to go through an all-in-one Docker image to test it locally, it seems to be the simplest way.
Based on the guide Getting Started , I run Pinot with this Docker command which allows it to be started with a pre-imported Baseball stat dataset. The container, after having instantiated the components and imported the data, will then execute a set of queries and display their results in the logs.
docker run \ -p 9000:9000 \ apachepinot/pinot:0.9.3 QuickStart \ -type batch
After starting Pinot and importing the dataset, I can use the Pinot console (available on port 9000 by default) to access the cluster.
This allows you to view the status of the cluster via the Cluster Management tab.
And launch queries via the Query Console tab. I make a count(*)
of the created baseballStats table, the query is executed almost immediately (a few milliseconds) but at the same time, there is only 97889 lines so it’s normal.
Analysis of European carbon emissions
Alright, that’s nice, but to get past Hello World and really test Pinot, I’m going to need a bigger dataset to play with.
The European Commission provides a set of Open Data, now is the time to take advantage of it.
I choose a dataset on greenhouse gas emissions in the Eurozone (you can find it at this page): Air emissions accounts by NACE Rev. 2 activity of 4 million data points.
Here is the description of the dataset: This data set reports the emissions of greenhouse gases and air pollutants broken down by 64 industries (classified by NACE Rev. 2) plus households. Concepts and principles are the same as in national accounts. Complete data starts from reference year 2008.
We will now have to load the data: each column is separated by a tabulation, the absence of data is defined by :
, the first column containing the NACE category it would be great to separate it into four columns: the labels are airpol,nace_r2,unit,geo\time.
To send this data to Pinot, I relied on the guide Pushing your data to Pinot.
To start, you have to define a schema for the data, here is the one I used:
{ "schemaName": "greenhouseGazEmission", "dimensionFieldSpecs": [ { "name": "airpol", "dataType": "STRING" }, { "name": "nace_r2", "dataType": "STRING" }, { "name": "unit", "dataType": "STRING" }, { "name": "geo", "dataType": "STRING" } ], "metricFieldSpecs": [ { "name": "2020", "dataType": "FLOAT" }, { "name": "2019", "dataType": "FLOAT" }, [...] // repeate the same for all fields down to 1995 ] }
In this schema, we define two types of fields, dimension fields: these are strings on which we will be able to filter or group the data, and metric fields which are here all floats (one field per year of data available) on which we will be able to make calculations (aggregations).
You must then define the table where to store the data, here is the definition of the table, it is for the moment very straightforward:
{ "tableName": "greenhouseGazEmission", "segmentsConfig" : { "replication" : "1", "schemaName" : "greenhouseGazEmission" }, "tableIndexConfig" : { "invertedIndexColumns" : [], "loadMode" : "MMAP" }, "tenants" : { "broker":"DefaultTenant", "server":"DefaultTenant" }, "tableType":"OFFLINE", "metadata": {} }
To be able to create this table, I restarted a Pinot cluster from the Docker Compose proposed in the documentation, then used the following Docker command which will launch a table creation command with the schema and table definitions previously created:
docker run --rm -ti \ --network=pinot_default \ -v ~/dev/pinot/data:/tmp/pinot-quick-start \ --name pinot-batch-table-creation \ apachepinot/pinot:0.9.3 AddTable \ -schemaFile /tmp/pinot-quick-start/greenhousGazEmission-schema.json \ -tableConfigFile /tmp/pinot-quick-start/greenhousGazEmission-table.json \ -controllerHost manual-pinot-controller \ -controllerPort 9000 -exec
We can then check via the Pinot console (http://localhost:9000) that the table has been created.
Now, time to insert the data!
The file is a TSV and contains :
when there is no data, to prepare it for ingestion into Pinot we will execute a sed
command to transform it into CSV (therefore replacing the tabulations by commas) and delete certain incorrect characters, this sed
command will also modify the line of field names which must then be returned to those expected by the schema.
sed 's/\t/,/g;s/://g;s/[pseb]//g;s/[[:blank:]]//g' env_ac_ainah_r2.tsv > data/env_ac_ainah_r2.csv
The ingestion into Pinot is done via an ingestion job defined in yaml format.
executionFrameworkSpec: name: 'standalone' segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner' segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner' segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner' jobType: SegmentCreationAndTarPush inputDirURI: '/tmp/pinot-quick-start' includeFileNamePattern: 'glob:**/*.csv' outputDirURI: '/tmp/pinot-quick-start/segments/' overwriteOutput: true pinotFSSpecs: - scheme: file className: org.apache.pinot.spi.filesystem.LocalPinotFS recordReaderSpec: dataFormat: 'csv' className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader' configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig' tableSpec: tableName: 'greenhouseGazEmission' schemaURI: 'http://pinot-controller:9000/tables/greenhouseGazEmission/schema' tableConfigURI: 'http://pinot-controller:9000/tables/greenhouseGazEmission' pinotClusterSpecs: - controllerURI: 'http://pinot-controller:9000'
This job defines among other things:
jobType: SegmentCreationAndTarPush
: the job will create a table segment. A segment is a partition of the table data. If your data set is large, you will have to cut the CSV file to be able to launch several jobs and obtain several segments.inputDirURI
andincludeFileNamePattern
to define where to look up for the CSV(s) to load data from.recordReaderSpec
which defines the CSV data format.tableSpec
which defines the target table specification, the one we defined earlier.
To launch the job, you can use the following Docker command:
docker run --rm -ti \ --network=pinot_default \ -v ~/dev/pinot/data:/tmp/pinot-quick-start \ --name pinot-data-ingestion-job \ apachepinot/pinot:0.9.3 LaunchDataIngestionJob \ -jobSpecFile /tmp/pinot-quick-start/job-ingest.yml
After ingestion, we now have 266456 rows in our table which we can then query from the Pinot console.
For example via the following query: select geo, sum(2019), sum(2020), sum(2021) from greenhouseGazEmission group by geo
.
Pinot uses Apache Calcite for querying, so we can use ANSI SQL which greatly simplifies querying. The above query will return the following results:
Since the dataset is quite small, queries run in milliseconds despite no index have been created.
Now let’s try the following query: select sum(2019), sum(2020), sum(2021) from greenhouseGazEmission where unit = 'G_HAB'
, it runs in 24ms, and we can see the following information in the result output:
- numDocsScanned: 66614
- totalDocs: 266456
- numEntriesScannedInFilter: 266456
Pinot has therefore scanned 66614 documents out of 266456, the documents which correspond to the G_HAB unit. During the filter phase, it scanned 266456 entries, it did a full scan on the table.
Optimization of queries thanks to indexes
Pinot allows the addition of indexes to optimize the queries, in the query previously used, the unit column was used to filter the data.
So I’m going to modify the table structure to add an inverted index on the unit column.
To do this we can modify the description of the table to add this index:
{ "tableName": "greenhouseGazEmission", "tableIndexConfig" : { "invertedIndexColumns" : ["unit"], "loadMode" : "MMAP" }, [...] }
For simplicity, I used the Pinot GUI to add the index (by editing the table) and then reload the segments. Reloading segments after a table change is necessary for the index to be created and updated.
After relaunching the query, we see that the numEntriesScannedInFilter goes to 0, Pinot is now using the newly created index.
One of the strengths of Pinot is that it supports many different types of indexes, which makes it possible to implement different use cases, and to optimize for query or storage, each index using disk space. .
To go further on Pinot’s indexing capabilities, you can read my article: Apache Pinot and its various types of indexes..
2 thoughts on “I finally took the time to test Apache Pinot”
Super article, merci pour ce travail 😉
(petite typo sur le sous titre => Omtimisation du requêtage grâce aux indexes)
Merci, c’est corrigé.