Apache Pinot et de ses différents types d’indexes
Il y a quelques temps, j’avais enfin pris le temps de tester Apache Pinot, vous pouvez trouver le récit de mes premières expérimentations ici.
Apache Pinot est un datastore OLAP (OnLine Analytical Processing) distribué et temps réel, spécialement conçu pour fournir des analyses à très faible latence, même à un débit extrêmement élevé. Si vous ne le connaissez pas, commencez par lire mon article d’introduction avant celui-ci.
Une des forces de Pinot est ses différents types d’indexes, ce sont ceux-ci que nous allons explorer dans cet article.
Le jeu de données Chicago Crimes
Nous allons utiliser le jeu de données Chicago Crimes de Google Big Query que nous allons exporter en CSV. Pour récupérer ce jeu de données, il faut aller dans l’interface de BigQuery, puis rechercher le projet publique bigquery-public-data
, puis la table chicago_crime
; naviguer vers cette URL devrait faire la même chose: https://console.cloud.google.com/bigquery?p=bigquery-public-data&d=chicago_crime.
Vous devez ensuite copier cette table dans un ensemble de données de vôtre projet GCP, puis l’exporter en CSV.
Cela vous donnera 6 CSV d’environ 250Mo chacun, donc 1,5Go de données à analyser.
Ce jeu de donnée contient les données de crimes de la ville de Chicago sur plusieurs années; sa description peut être trouvée ici : https://console.cloud.google.com/marketplace/details/city-of-chicago-public-data/chicago-crime?filter=solution-type:dataset.
Une fois les CSV récupéré, il va falloir définir un schéma et une table, puis créer un Job pour importer les données.
Voici le schéma que nous allons utiliser :
{ "schemaName": "chicagoCrimes", "dimensionFieldSpecs": [ { "name": "unique_key", "dataType": "LONG" }, { "name": "case_number", "dataType": "STRING" }, { "name": "block", "dataType": "STRING" }, { "name": "iucr", "dataType": "STRING" }, { "name": "primary_type", "dataType": "STRING" }, { "name": "description", "dataType": "STRING" }, { "name": "location_description", "dataType": "STRING" }, { "name":"district", "dataType": "INT" }, { "name": "ward", "dataType": "INT" }, { "name": "community_area", "dataType": "INT" }, { "name": "fbi_code", "dataType": "STRING" }, { "name": "year", "dataType": "INT" }, { "name": "location", "dataType": "STRING" }, { "name": "arrest", "dataType": "BOOLEAN" }, { "name": "domestic", "dataType": "BOOLEAN" }, { "name": "beat", "dataType": "BOOLEAN" } ], "metricFieldSpecs": [ { "name": "x_coordinate", "dataType": "FLOAT" }, { "name": "y_coordinate", "dataType": "FLOAT" }, { "name": "latitude", "dataType":"FLOAT" }, { "name": "longitude", "dataType": "FLOAT" } ], "dateTimeFieldSpecs": [ { "name": "date", "dataType": "STRING", "format": "1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss z", "granularity": "1:SECONDS" }, { "name": "updated_on", "dataType": "STRING", "format": "1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss z", "granularity": "1:SECONDS" } ] }
Et voici la table associée, celle-ci ne contient pour l’instant aucun indexe :
{ "tableName": "chicagoCrimes", "segmentsConfig" : { "replication" : "1", "schemaName" : "chicagoCrimes" }, "tableIndexConfig" : { "invertedIndexColumns" : [], "loadMode" : "MMAP" }, "tenants" : { "broker":"DefaultTenant", "server":"DefaultTenant" }, "tableType":"OFFLINE", "metadata": {} }
Pour ingérer les données vous pouvez utiliser le job suivant :
executionFrameworkSpec: name: 'standalone' segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner' segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner' segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner' jobType: SegmentCreationAndTarPush inputDirURI: '/tmp/pinot-quick-start' includeFileNamePattern: 'glob:**/*.csv' outputDirURI: '/tmp/pinot-quick-start/segments/' overwriteOutput: true pinotFSSpecs: - scheme: file className: org.apache.pinot.spi.filesystem.LocalPinotFS recordReaderSpec: dataFormat: 'csv' className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader' configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig' tableSpec: tableName: 'chicagoCrimes' schemaURI: 'http://pinot-controller:9000/tables/chicagoCrimes/schema' tableConfigURI: 'http://pinot-controller:9000/tables/chicagoCrimes' pinotClusterSpecs: - controllerURI: 'http://pinot-controller:9000'
Nous allons maintenant démarrer un cluster Pinot, le plus simple est d’utiliser le Docker Compose fournit dans la documentation de Pinot.
Pour démarrer le cluster via Docker Compose, il faut lancer la commande docker compose up
. Après quelques minutes, vous aurez un cluster Pinot de démarré dont l’interface est accessible à l’URL http://localhost:9000.
L’image Pinot permet de lancer des jobs de création de table ou d’ingestion de données, les commandes suivantes partent du principe que les ressources nécessaires sont dans un répertoire ~/dev/pinot/crime
qui contient :
- Le schéma de la table dans un fichier
chicagoCrimes-schema.json
- La configuration de la table dans un fichier
chicagoCrimes-table.json
- Les 6 CSV de données dans un répertoire
data
Pour créer la table vous pouvez utiliser la commande docker suivante :
docker run --rm -ti \ --network=pinot_default \ -v ~/dev/pinot/crime:/tmp/pinot-quick-start \ --name pinot-batch-table-creation \ apachepinot/pinot:0.10.0 AddTable \ -schemaFile /tmp/pinot-quick-start/chicagoCrimes-schema.json \ -tableConfigFile /tmp/pinot-quick-start/chicagoCrimes-table.json \ -controllerHost manual-pinot-controller \ -controllerPort 9000 -exec
Pour ingérer les données vous pouvez utiliser la commande docker suivante :
docker run --rm -ti \ --network=pinot_default \ -v ~/dev/pinot/crime:/tmp/pinot-quick-start \ --name pinot-data-ingestion-job \ apachepinot/pinot:0.10.0 LaunchDataIngestionJob \ -jobSpecFile /tmp/pinot-quick-start/job-ingest.yml
Après ingestion, nous aurons 6452716 lignes dans notre table.
Les performances sans indexe
Pour tester les performances de Pinot sans aucun indexe, nous allons lancer quelques requêtes depuis la console d’administration de Pinot :
select count(*) from chicagoCrimes select year, count(*) from chicagoCrimes where arrest = true group by year select year, count(*) from chicagoCrimes where primary_type='NARCOTICS' group by year select year, count(*) from chicagoCrimes where x_coordinate>1180000 group by year select year, count(*) from chicagoCrimes where ward=45 group by year select year, sum(community_area) from chicagoCrimes group by year
Constatation : les requêtes s’exécutent en quelques (dizaines de) millisecondes sur un jeu de données de 1,5Go et 6,5 millions de lignes.
Les segments prennent eux 476Mo sur disque.
Le secret de cette bonne performance sans indexe est que chaque champ est stocké dans un Forward Index, par défaut de type dictionnaire pour les colonnes dimension sinon raw value.
Dictionary-encoded forward index with bit compression
Dans un forward index de type dictionnaire, un identifiant est attribué à chaque valeur unique d’une colonne, et un dictionnaire est construit pour associer l’identifiant à la valeur. Le forward index stocke les identifiants compressés en bits. Si vous avez peu de valeurs uniques, le codage par dictionnaire peut améliorer considérablement l’efficacité de stockage de l’indexe.
Source : https://docs.pinot.apache.org/basics/indexing/forward-index.
Les indexes de Pinot
Inverted index
Dans un index inversé, une entrée (un mapping) est créée pour chaque valeur du champ. Cette entrée va contenir la liste des documents qui contiennent cette valeur.
Par exemple, pour les documents suivants :
Document IDs | primary_type |
---|---|
1 | MURDER |
2 | MURDER |
3 | DRUGS |
Vous obtiendrez l’index inversé suivant :
primary_type | Document IDs |
---|---|
MURDER | 1, 2 |
DRUGS | 3 |
Bloom Filter
Un Bloom Filter permet d’exclure les segments qui ne contiennent aucun enregistrement correspondant à un prédicat d’ÉGALITÉ.
Range index
Identique à un index inversé, mais va créer une entrée dans l’index (un mapping) pour une plage (range) de valeur au lieu d’en créer une pour chaque valeur.
Permet d’économiser de l’espace pour les colonnes ayant beaucoup de valeurs distinctes.
Pour les colonnes de types TIMESTAMP
, un index dédié existe et sert le même but : le Timestamp index.
Star-tree
La structure de données star-tree offre un compromis configurable entre le stockage et la latence et nous permet d’atteindre une limite supérieure stricte pour les latences des requêtes pour un cas d’utilisation donné.
Source : https://docs.pinot.apache.org/basics/indexing/star-tree-index.
Un index de type star-tree va pré-calculer et stocker une ou plusieurs pré-agrégations.
Il va utiliser une structure de données en arbre pour pré-calculer des sous-agrégations en fonction d’une dimension à certains nœuds de l’arbre.
Au requêtage, Pinot va ensuite sélectionner les nœuds prenant part à la requête et agréger les sous-agrégation de ces nœuds.
Un index star-tree est une structure en arbre qui contient les types de nœuds suivants :
- Root Node (Orange) : Nœud racine unique, à partir duquel le reste de l’arbre peut être parcouru.
- Leaf Node (Blue) : Peut contenir au plus T enregistrements, où T est configurable.
- Non-leaf Node (Green) : Les nœuds avec plus de T enregistrements sont ensuite divisés en nœuds enfants.
- Star-Node (Yellow) : Star-Node (nœud étoile), contient les enregistrements pré-agrégés après suppression de la dimension sur laquelle les données ont été fractionnées pour ce niveau.
- Dimensions Split Order ([D1, D2]) : Liste ordonnée de dimensions utilisée pour déterminer la dimension à fractionner pour un niveau donné dans l’arborescence.
Les résultats
Pour tester les différents types d’indexes, nous allons créer une table chicagoCrimesWithIdx
avec un ensemble d’indexe et exécuter les mêmes requêtes sur la table avec indexes et celle sans pour comparer les performances.
La définition de table suivante ré-utilise le schéma chicagoCrimes
mais ajoute des indexes sur certains champs.
{ "tableName": "chicagoCrimesWithIdx", "segmentsConfig" : { "replication" : "1", "schemaName" : "chicagoCrimes" }, "tableIndexConfig" : { "invertedIndexColumns" : ["primary_type"], "bloomFilterColumns": ["ward"], "rangeIndexColumns": ["x_coordinate"], "starTreeIndexConfigs": [{ "dimensionsSplitOrder": ["year"], "skipStarNodeCreationForDimensions": [], "functionColumnPairs": ["SUM__community_area"], "maxLeafRecords": 1 }], "loadMode" : "MMAP" }, "tenants" : { "broker":"DefaultTenant", "server":"DefaultTenant" }, "tableType":"OFFLINE", "metadata": {} }
Vous pouvez créer la table via la ligne de commande Docker suivante :
docker run --rm -ti \ --network=pinot_default \ -v ~/dev/pinot/crime:/tmp/pinot-quick-start \ --name pinot-batch-table-creation \ apachepinot/pinot:0.10.0 AddTable \ -schemaFile /tmp/pinot-quick-start/chicagoCrimes-schema.json \ -tableConfigFile /tmp/pinot-quick-start/chicagoCrimes-table-with-idx.json \ -controllerHost manual-pinot-controller \ -controllerPort 9000 -exec
Et charger les données dans la table via la ligne de commande Docker suivante, le job job-ingest-with-idx.yml
est identique au job job-ingest.yml
sauf qu’il utilise la nouvelle description de table :
docker run --rm -ti \ --network=pinot_default \ -v ~/dev/pinot/crime:/tmp/pinot-quick-start \ --name pinot-data-ingestion-job \ apachepinot/pinot:0.10.0 LaunchDataIngestionJob \ -jobSpecFile /tmp/pinot-quick-start/job-ingest-with-idx.yml
Inverted index
select year, count(*) from chicagoCrimes where primary_type='NARCOTICS' group by year
Index O/N | timeUsedMs | numDocsScanned | numEntriesScannedInFilter | numEntriesScannedPostFilter |
---|---|---|---|---|
Sans indexe | 25ms | 636118 | 6452716 | 636118 |
Avec indexe | 11ms | 636118 | O | 636118 |
On voit ici l’intérêt d’un indexe inversé : le filtre (la clause WHERE) n’a scanné aucune entrée car il a utilisé l’indexe.
Le temps d’exécution de la requête a été grandement amélioré passant de 25ms à 11ms.
Range index
select year, count(*) from chicagoCrimes where x_coordinate>1180000 group by year
Index O/N | timeUsedMs | numDocsScanned | numEntriesScannedInFilter | numEntriesScannedPostFilter |
---|---|---|---|---|
Sans indexe | 29ms | 990885 | 6452716 | 990885 |
Avec indexe | 11ms | 990885 | 641072 | 990885 |
On voit ici l’intérêt d’un indexe de type range : le filtre (la clause WHERE) a scanné beaucoup moins d’entrées car il a utilisé l’indexe même si l’index ayant une entrée par range il a quand même dû scanner une partie des entrée (10% ici).
Le temps d’exécution de la requête a été grandement amélioré passant de 29ms à 11ms.
Bloom filter
select year, count(*) from chicagoCrimesWithIdx where ward=45 group by year
Un Bloom filter va filtrer les segments (segment pruning) à traiter, bien qu’ici nous ayons 6 segments, ceux-ci étant créer sans aucune logique de filtre (segment technique), le Bloom filter n’aura aucun effet.
Pour pouvoir utiliser un Blomm filter sur le champs ward, il aurait fallut découper les segments sur la valeur de ce champs (un segment par plage de valeur par exemple).
Star-tree index
select year, sum(community_area) from chicagoCrimes group by year
Index O/N | timeUsedMs | numDocsScanned | numEntriesScannedInFilter | numEntriesScannedPostFilter |
---|---|---|---|---|
Sans indexe | 31ms | 6452716 | 0 | 12905432 |
Avec indexe | 6ms | 132 | 0 | 264 |
Avec un index star-tree, des documents seront pré-calculés avec des pré-agrégations.
Au lieu de scanner les documents de la table, ce sont les documents de l’indexes qui auront été scannés; d’où le nombre de documents scanné de 132.
Cette requête n’a utilisé que les données de l’indexes et s’est donc exécuté très rapidement.
Le temps d’exécution de la requête a été grandement amélioré passant de 31ms à 6ms.
Conclusion
Sans indexe, les performances d’une requête Pinot sont déjà très bonnes grâce à son stockage optimisé des champs en forward index. L’ajout d’indexes spécifiques permet un gain de performance non négligeable même pour des requête balayant une grande partie des données de la table. Attention, les temps de requêtes données dans cet articles sont en exécution locale sur un jeu de données assez petit pour Pinot, ils ne peuvent donc être extrapolé à un vrai jeu de donnée sur un déploiement Pinot de production.
L’index de type star-tree permet de réaliser des pré-agrégation sans nécessiter un grand espace de stockage, les requêtes l’utilisant deviennent alors ultra-rapides car elles utilisent un petit nombre de documents pré-construits au lieu de nécessiter un scan total des segments. C’est pour moi l’index le plus intéressant et novateur offert par Pinot.