Apache Pinot et de ses différents types d’indexes

Apache Pinot et de ses différents types d’indexes

Il y a quelques temps, j’avais enfin pris le temps de tester Apache Pinot, vous pouvez trouver le récit de mes premières expérimentations ici.

Apache Pinot est un datastore OLAP (OnLine Analytical Processing) distribué et temps réel, spécialement conçu pour fournir des analyses à très faible latence, même à un débit extrêmement élevé. Si vous ne le connaissez pas, commencez par lire mon article d’introduction avant celui-ci.

Une des forces de Pinot est ses différents types d’indexes, ce sont ceux-ci que nous allons explorer dans cet article.

Le jeu de données Chicago Crimes

Nous allons utiliser le jeu de données Chicago Crimes de Google Big Query que nous allons exporter en CSV. Pour récupérer ce jeu de données, il faut aller dans l’interface de BigQuery, puis rechercher le projet publique bigquery-public-data, puis la table chicago_crime; naviguer vers cette URL devrait faire la même chose: https://console.cloud.google.com/bigquery?p=bigquery-public-data&d=chicago_crime.

Vous devez ensuite copier cette table dans un ensemble de données de vôtre projet GCP, puis l’exporter en CSV.
Cela vous donnera 6 CSV d’environ 250Mo chacun, donc 1,5Go de données à analyser.

Ce jeu de donnée contient les données de crimes de la ville de Chicago sur plusieurs années; sa description peut être trouvée ici : https://console.cloud.google.com/marketplace/details/city-of-chicago-public-data/chicago-crime?filter=solution-type:dataset.

Une fois les CSV récupéré, il va falloir définir un schéma et une table, puis créer un Job pour importer les données.

Voici le schéma que nous allons utiliser :

{
  "schemaName": "chicagoCrimes",
  "dimensionFieldSpecs": [
    {
      "name": "unique_key", "dataType": "LONG"
    },
    {
      "name": "case_number", "dataType": "STRING"
    },
    {
      "name": "block", "dataType": "STRING"
    },
    {
      "name": "iucr", "dataType": "STRING"
    },
    {
      "name": "primary_type", "dataType": "STRING"
    },
    {
      "name": "description", "dataType": "STRING"
    },
    {
      "name": "location_description", "dataType": "STRING"
    },
    {
      "name":"district", "dataType": "INT"
    },
    {
      "name": "ward", "dataType": "INT"
    },
    {
      "name": "community_area", "dataType": "INT"
    },
    {
      "name": "fbi_code", "dataType": "STRING"
    },
    {
      "name": "year", "dataType": "INT"
    },
    {
      "name": "location", "dataType": "STRING"
    },
    {
      "name": "arrest", "dataType": "BOOLEAN"
    },
    {
      "name": "domestic", "dataType": "BOOLEAN"
    },
    {
      "name": "beat", "dataType": "BOOLEAN"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "x_coordinate", "dataType": "FLOAT"
    },
    {
      "name": "y_coordinate", "dataType": "FLOAT"
    },
    {
      "name": "latitude", "dataType":"FLOAT"
    },
    {
      "name": "longitude", "dataType": "FLOAT"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "date",
      "dataType": "STRING",
      "format": "1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss z",
      "granularity": "1:SECONDS"
    },
    {
      "name": "updated_on",
      "dataType": "STRING",
      "format": "1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss z",
      "granularity": "1:SECONDS"
    }
  ]
}

Et voici la table associée, celle-ci ne contient pour l’instant aucun indexe :

{
  "tableName": "chicagoCrimes",
  "segmentsConfig" : {
    "replication" : "1",
    "schemaName" : "chicagoCrimes"
  },
  "tableIndexConfig" : {
    "invertedIndexColumns" : [],
    "loadMode"  : "MMAP"
  },
  "tenants" : {
    "broker":"DefaultTenant",
    "server":"DefaultTenant"
  },
  "tableType":"OFFLINE",
  "metadata": {}
}

Pour ingérer les données vous pouvez utiliser le job suivant :

executionFrameworkSpec:
  name: 'standalone'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/tmp/pinot-quick-start'
includeFileNamePattern: 'glob:**/*.csv'
outputDirURI: '/tmp/pinot-quick-start/segments/'
overwriteOutput: true
pinotFSSpecs:
  - scheme: file
    className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
  dataFormat: 'csv'
  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
  tableName: 'chicagoCrimes'
  schemaURI: 'http://pinot-controller:9000/tables/chicagoCrimes/schema'
  tableConfigURI: 'http://pinot-controller:9000/tables/chicagoCrimes'
pinotClusterSpecs:
  - controllerURI: 'http://pinot-controller:9000'

Nous allons maintenant démarrer un cluster Pinot, le plus simple est d’utiliser le Docker Compose fournit dans la documentation de Pinot.

Pour démarrer le cluster via Docker Compose, il faut lancer la commande docker compose up. Après quelques minutes, vous aurez un cluster Pinot de démarré dont l’interface est accessible à l’URL http://localhost:9000.

L’image Pinot permet de lancer des jobs de création de table ou d’ingestion de données, les commandes suivantes partent du principe que les ressources nécessaires sont dans un répertoire ~/dev/pinot/crime qui contient :

  • Le schéma de la table dans un fichier chicagoCrimes-schema.json
  • La configuration de la table dans un fichier chicagoCrimes-table.json
  • Les 6 CSV de données dans un répertoire data

Pour créer la table vous pouvez utiliser la commande docker suivante :

docker run --rm -ti \
    --network=pinot_default \
    -v ~/dev/pinot/crime:/tmp/pinot-quick-start \
    --name pinot-batch-table-creation \
    apachepinot/pinot:0.10.0 AddTable \
    -schemaFile /tmp/pinot-quick-start/chicagoCrimes-schema.json \
    -tableConfigFile /tmp/pinot-quick-start/chicagoCrimes-table.json \
    -controllerHost manual-pinot-controller \
    -controllerPort 9000 -exec

Pour ingérer les données vous pouvez utiliser la commande docker suivante :

docker run --rm -ti \
    --network=pinot_default \
    -v ~/dev/pinot/crime:/tmp/pinot-quick-start \
    --name pinot-data-ingestion-job \
    apachepinot/pinot:0.10.0 LaunchDataIngestionJob \
    -jobSpecFile /tmp/pinot-quick-start/job-ingest.yml

Après ingestion, nous aurons 6452716 lignes dans notre table.

Les performances sans indexe

Pour tester les performances de Pinot sans aucun indexe, nous allons lancer quelques requêtes depuis la console d’administration de Pinot :

select count(*) from chicagoCrimes

select year, count(*) from chicagoCrimes where arrest = true group by year
 
select year, count(*) from chicagoCrimes where primary_type='NARCOTICS' group by year

select year, count(*) from chicagoCrimes where x_coordinate>1180000 group by year

select year, count(*) from chicagoCrimes where ward=45 group by year

select year, sum(community_area) from chicagoCrimes group by year

Constatation : les requêtes s’exécutent en quelques (dizaines de) millisecondes sur un jeu de données de 1,5Go et 6,5 millions de lignes.

Les segments prennent eux 476Mo sur disque.

Le secret de cette bonne performance sans indexe est que chaque champ est stocké dans un Forward Index, par défaut de type dictionnaire pour les colonnes dimension sinon raw value.

Dictionary-encoded forward index with bit compression

Dans un forward index de type dictionnaire, un identifiant est attribué à chaque valeur unique d’une colonne, et un dictionnaire est construit pour associer l’identifiant à la valeur. Le forward index stocke les identifiants compressés en bits. Si vous avez peu de valeurs uniques, le codage par dictionnaire peut améliorer considérablement l’efficacité de stockage de l’indexe.

Source : https://docs.pinot.apache.org/basics/indexing/forward-index.

Les indexes de Pinot

Inverted index

Dans un index inversé, une entrée (un mapping) est créée pour chaque valeur du champ. Cette entrée va contenir la liste des documents qui contiennent cette valeur.

Par exemple, pour les documents suivants :

Document IDs primary_type
1 MURDER
2 MURDER
3 DRUGS

Vous obtiendrez l’index inversé suivant :

primary_type Document IDs
MURDER 1, 2
DRUGS 3

Bloom Filter

Un Bloom Filter permet d’exclure les segments qui ne contiennent aucun enregistrement correspondant à un prédicat d’ÉGALITÉ.

Range index

Identique à un index inversé, mais va créer une entrée dans l’index (un mapping) pour une plage (range) de valeur au lieu d’en créer une pour chaque valeur.

Permet d’économiser de l’espace pour les colonnes ayant beaucoup de valeurs distinctes.

Pour les colonnes de types TIMESTAMP, un index dédié existe et sert le même but : le Timestamp index.

Star-tree

La structure de données star-tree offre un compromis configurable entre le stockage et la latence et nous permet d’atteindre une limite supérieure stricte pour les latences des requêtes pour un cas d’utilisation donné.

Source : https://docs.pinot.apache.org/basics/indexing/star-tree-index.

Un index de type star-tree va pré-calculer et stocker une ou plusieurs pré-agrégations.

Il va utiliser une structure de données en arbre pour pré-calculer des sous-agrégations en fonction d’une dimension à certains nœuds de l’arbre.

Au requêtage, Pinot va ensuite sélectionner les nœuds prenant part à la requête et agréger les sous-agrégation de ces nœuds.

Un index star-tree est une structure en arbre qui contient les types de nœuds suivants :

  • Root Node (Orange) : Nœud racine unique, à partir duquel le reste de l’arbre peut être parcouru.
  • Leaf Node (Blue) : Peut contenir au plus T enregistrements, où T est configurable.
  • Non-leaf Node (Green) : Les nœuds avec plus de T enregistrements sont ensuite divisés en nœuds enfants.
  • Star-Node (Yellow) : Star-Node (nœud étoile), contient les enregistrements pré-agrégés après suppression de la dimension sur laquelle les données ont été fractionnées pour ce niveau.
  • Dimensions Split Order ([D1, D2]) : Liste ordonnée de dimensions utilisée pour déterminer la dimension à fractionner pour un niveau donné dans l’arborescence.

Les résultats

Pour tester les différents types d’indexes, nous allons créer une table chicagoCrimesWithIdx avec un ensemble d’indexe et exécuter les mêmes requêtes sur la table avec indexes et celle sans pour comparer les performances.

La définition de table suivante ré-utilise le schéma chicagoCrimes mais ajoute des indexes sur certains champs.

{
  "tableName": "chicagoCrimesWithIdx",
  "segmentsConfig" : {
    "replication" : "1",
    "schemaName" : "chicagoCrimes"
  },
  "tableIndexConfig" : {
    "invertedIndexColumns" : ["primary_type"],
    "bloomFilterColumns": ["ward"],
    "rangeIndexColumns": ["x_coordinate"],
    "starTreeIndexConfigs": [{
      "dimensionsSplitOrder": ["year"],
      "skipStarNodeCreationForDimensions": [],
      "functionColumnPairs": ["SUM__community_area"],
      "maxLeafRecords": 1
    }],
    "loadMode"  : "MMAP"
  },
  "tenants" : {
    "broker":"DefaultTenant",
    "server":"DefaultTenant"
  },
  "tableType":"OFFLINE",
  "metadata": {}
}

Vous pouvez créer la table via la ligne de commande Docker suivante :

docker run --rm -ti \
    --network=pinot_default \
    -v ~/dev/pinot/crime:/tmp/pinot-quick-start \
    --name pinot-batch-table-creation \
    apachepinot/pinot:0.10.0 AddTable \
    -schemaFile /tmp/pinot-quick-start/chicagoCrimes-schema.json \
    -tableConfigFile /tmp/pinot-quick-start/chicagoCrimes-table-with-idx.json \
    -controllerHost manual-pinot-controller \
    -controllerPort 9000 -exec

Et charger les données dans la table via la ligne de commande Docker suivante, le job job-ingest-with-idx.yml est identique au job job-ingest.yml sauf qu’il utilise la nouvelle description de table :

docker run --rm -ti \
    --network=pinot_default \
    -v ~/dev/pinot/crime:/tmp/pinot-quick-start \
    --name pinot-data-ingestion-job \
    apachepinot/pinot:0.10.0 LaunchDataIngestionJob \
    -jobSpecFile /tmp/pinot-quick-start/job-ingest-with-idx.yml

Inverted index

select year, count(*) from chicagoCrimes where primary_type='NARCOTICS' group by year
Index O/N timeUsedMs numDocsScanned numEntriesScannedInFilter numEntriesScannedPostFilter
Sans indexe 25ms 636118 6452716 636118
Avec indexe 11ms 636118 O 636118

On voit ici l’intérêt d’un indexe inversé : le filtre (la clause WHERE) n’a scanné aucune entrée car il a utilisé l’indexe.
Le temps d’exécution de la requête a été grandement amélioré passant de 25ms à 11ms.

Range index

select year, count(*) from chicagoCrimes where x_coordinate>1180000 group by year
Index O/N timeUsedMs numDocsScanned numEntriesScannedInFilter numEntriesScannedPostFilter
Sans indexe 29ms 990885 6452716 990885
Avec indexe 11ms 990885 641072 990885

On voit ici l’intérêt d’un indexe de type range : le filtre (la clause WHERE) a scanné beaucoup moins d’entrées car il a utilisé l’indexe même si l’index ayant une entrée par range il a quand même dû scanner une partie des entrée (10% ici).
Le temps d’exécution de la requête a été grandement amélioré passant de 29ms à 11ms.

Bloom filter

select year, count(*) from chicagoCrimesWithIdx where ward=45 group by year

Un Bloom filter va filtrer les segments (segment pruning) à traiter, bien qu’ici nous ayons 6 segments, ceux-ci étant créer sans aucune logique de filtre (segment technique), le Bloom filter n’aura aucun effet.

Pour pouvoir utiliser un Blomm filter sur le champs ward, il aurait fallut découper les segments sur la valeur de ce champs (un segment par plage de valeur par exemple).

Star-tree index

select year, sum(community_area) from chicagoCrimes group by year
Index O/N timeUsedMs numDocsScanned numEntriesScannedInFilter numEntriesScannedPostFilter
Sans indexe 31ms 6452716 0 12905432
Avec indexe 6ms 132 0 264

Avec un index star-tree, des documents seront pré-calculés avec des pré-agrégations.
Au lieu de scanner les documents de la table, ce sont les documents de l’indexes qui auront été scannés; d’où le nombre de documents scanné de 132.
Cette requête n’a utilisé que les données de l’indexes et s’est donc exécuté très rapidement.
Le temps d’exécution de la requête a été grandement amélioré passant de 31ms à 6ms.

Conclusion

Sans indexe, les performances d’une requête Pinot sont déjà très bonnes grâce à son stockage optimisé des champs en forward index. L’ajout d’indexes spécifiques permet un gain de performance non négligeable même pour des requête balayant une grande partie des données de la table. Attention, les temps de requêtes données dans cet articles sont en exécution locale sur un jeu de données assez petit pour Pinot, ils ne peuvent donc être extrapolé à un vrai jeu de donnée sur un déploiement Pinot de production.

L’index de type star-tree permet de réaliser des pré-agrégation sans nécessiter un grand espace de stockage, les requêtes l’utilisant deviennent alors ultra-rapides car elles utilisent un petit nombre de documents pré-construits au lieu de nécessiter un scan total des segments. C’est pour moi l’index le plus intéressant et novateur offert par Pinot.

Laisser un commentaire

Ce site utilise Akismet pour réduire les indésirables. En savoir plus sur comment les données de vos commentaires sont utilisées.