Introduction to Kestra
Kestra is an open-source data orchestrator and scheduler. With Kestra, data workflows, called flows, use the YAML format and are executed by its engine via an API call, the user interface, or a trigger (webhook, schedule, SQL query, Pub/Sub message, …).
The important notions of Kestra are :
- The flow: which describes how the data will be orchestrated (the workflow thus). It is a sequence of tasks .
- The task: a step in the flow that will perform an action on one or more incoming data, the inputs, and then generate zero or one output data, the output.
- The execution: which is an instance of a flow following a trigger.
- The trigger: which defines how a flow can be triggered.
We will see these concepts in practice in the following article.
To launch Kestra locally, the easiest way is to use the Docker compose provided file, then run it via docker compose up
.
This Docker compose will run Kestra in standalone mode (all components in a single process) with a PostgreSQL database as a queue and repository (for storing and executing flows), and a local storage (for storing flow data).
Kestra is composed of several components that communicate together via queues and store flow information via repositories, these queues and repositories can be implemented in different ways (memory, JDBC, Kafka, Elasticsearch). I’m not going into detail about Kestra’s architecture today, but for the more curious you can refer to the architecture documentation of Kestra.
Once Kestra is started, a graphical interface will be available on port 8080: http://localhost:8080, it’s via this interface that we’ll do all the examples in this article.
At the first launch, a Guided Tour will be offered, you can follow it or skip it to be able to follow the examples in this article.
My first flow
To create a flow, go to the Flows menu and then click on the Create button at the bottom right. Now you have a textarea in which you will be able to enter the YAML description of the flow.
id: hello-world
namespace: fr.loicmathieu.example
tasks:
- id: hello
type: io.kestra.core.tasks.debugs.Echo
format: "Hello World"
A flow has:
- An
id
property that is its unique identifier within a namespace. - A
namespace
property, namespaces are hierarchical like directories in a file system. - A
tasks
property which is the list of tasks to be performed at the execution of the flow.
Here we have added a single task that has three properties:
- An
id
property that is its unique identifier within the flow. - A
type
property which is the name of the class that represents the task. - A
format
property, this is a property specific to Echo tasks that defines the format of the message to be logged (an Echo task is like the echo shell command).
Each task will have its own properties, which are documented in the online documentation as well as in the documentation integrated in the Kestra graphical interface (Documentation -> Plugins -> then select a plugin in the right menu).
Within the editor, the YAML description of the flow is validated and autocompletion is available for the type and properties of the tasks via CTRL+SPACE.
Kestra is based on a plugin system that allows you to add tasks (but also triggers or conditions that we will see later). By default, there are very few plugins provided in Kestra but the Docker image used in the Docker compose is built by adding all the plugins maintained by the Kestra development team, their documentation is here : https://kestra.io/plugins/.
To launch this first flow, go to the Executions tab and then click the New Execution button, which will switch to the Gantt view of the flow execution which is updated in real time with the status of the execution.
The Logs tab allows you to see the runtime logs.
Note the Hello World
log generated by the hello task.
Now let’s add an input named name to our flow to give it the name of the person to say hello to. To do this, we need to edit the flow by selecting the flow, either via the breadcrumb navigation at the top ( Home / Flows / fr.loicmathieu.example.hello-world), or by left-clicking on Flows and selecting it from the list; then go to the Source tab.
We can pass data to a flow via inputs. We will define a name input of type STRING.
id: hello-world
namespace: fr.loicmathieu.example
inputs:
- type: STRING
name: name
tasks:
- id: hello
type: io.kestra.core.tasks.debugs.Echo
format: "Hello {{inputs.name}}"
When we click on the New Execution button, a form asks us to fill in the flow inputs.
After entering a value for the name and running the flow, we can see that the log contains the value of the input.
Hello {{inputs.name}}
is an expression that uses the Pebble templating engine that will render what is between the {{
and }}
moustache. Here the Pebble expression will fetch the value of the inputs.name variable that points to the input with name name.
Inputs and BigQuery
In this example, the flow will take a CSV file as input, load it into Google BigQuery, then run a query on the loaded data and display the result in the logs.
I’m using BigQuery here because it’s easier than starting a database, but it requires a Google Cloud account and a service account set up. To do this, you need to create a Google Cloud service account and use it in the serviceAccount
property of the BigQuery task. To avoid hard-coding this variable, it is possible to use an environment variable or a cluster global variable, see the online documentation on variables.
Without further ado, the solution 😉
id: beer
namespace: fr.loicmathieu.example
description: A flow to handle my beers data
inputs:
- type: FILE
description: Beers data
name: beers
tasks:
- id: start-log
type: io.kestra.core.tasks.debugs.Echo
format: "{{taskrun.startDate}} - {{task.id}} - Launching the flow"
- id: load-beers
type: io.kestra.plugin.gcp.bigquery.Load
serviceAccount: "<service account key>"
csvOptions:
fieldDelimiter: ","
destinationTable: beers.beers
format: CSV
from: "{{inputs.beers}}"
- id: query-beers
type: io.kestra.plugin.gcp.bigquery.Query
serviceAccount: "<service account key>"
fetchOne: true
sql: |
SELECT count(*) as count FROM beers.beers
- id: end-log
type: io.kestra.core.tasks.debugs.Echo
format: "{{taskrun.startDate}} - {{outputs['query-beers'].row.count}} beers loaded"
Here I have defined an input of type FILE
named beers. Kestra will detect the type of the input and generate the corresponding flow execution form. This will upload a CSV file containing a list of beers. This file will be stored in Kestra’s internal storage and available in all tasks.
The load-beers task of type io.kestra.plugin.gcp.bigquery.Load
is used to load a CSV file into a BigQuery table. The BigQuery dataset beers must have been created before this task is executed. The from property takes a file from the internal storage (actually the URI of the file, it will be retrieved at the last moment from the internal storage). Here I use the Pebble expression {{inputs.beers}}
to retrieve the file passed as input to the flow.
The query-beers task of type io.kestra.plugin.gcp.bigquery.Query
is used to perform a BigQuery query. There are several ways to store the query result. Here, I use the fetchOne: true
property which configures the task to fetch a single row and put the result of the query in the task output. It is also possible to load all rows (fetch: true
), or store the results in Kestra’s internal storage (store: true
) which is recommended for queries that bring back many rows.
The end-log task will write a log, we have already seen it before. Here, we want to write in the log the number of records loaded from the database, so we will get the corresponding output from the query-beers task via a Pebble expression: {{outputs['query-beers'].row.count}}
.
The expression {{outputs['query-beers'].row.count}}
may seem intriguing:
outputs['query-beers']
: means the output of the query-beers task, so far we have seen the dotted notation (.) to access outputs, but when this one containing a ‘-‘ character, we are forced to use the subscript notation ([]) because ‘-‘ is a special character for Pebble.row
: is the name of the attribute set as output in the query-beers task, a task can have multiple attributes as outputs, refer to the task documentation for their list.count
: is the name of the column.
ForEach and file format
In this example, we will query the 10 most viewed Wikipedia pages using the BigQuery public dataset wikipedia.pageviews_2023 for French, English and German languages. Then we will transform the result into CSV.
id: wikipedia-top-ten
namespace: fr.loicmathieu.example
description: A flow that loads wikipedia top 10 FR pages each hour
tasks:
- id: start-log
type: io.kestra.core.tasks.debugs.Echo
format: "{{taskrun.startDate}} - {{task.id}} - Launching the flow"
- id: for-each-countries
type: io.kestra.core.tasks.flows.EachSequential
tasks:
- id: query-top-ten
type: io.kestra.plugin.gcp.bigquery.Query
serviceAccount: "<service account key>"
sql: |
SELECT DATETIME(datehour) as date, title, views
FROM `bigquery-public-data.wikipedia.pageviews_2023`
WHERE DATE(datehour) = current_date() and wiki = '{{taskrun.value}}' and title not in ('Cookie_(informatique)', 'Wikipédia:Accueil_principal', 'Spécial:Recherche')
ORDER BY datehour desc, views desc
LIMIT 10
store: true
- id: write-csv
type: io.kestra.plugin.serdes.csv.CsvWriter
from: "{{outputs['query-top-ten'][taskrun.value].uri}}"
value: '["fr", "en", "de"]'
The for-each-countries task of type io.kestra.core.tasks.flows.EachSequential
allows for looing. It will perform the list of child tasks several times for the values passed in its value property; here the languages fr, en and de.
The query-top-ten task of type io.kestra.plugin.gcp.bigquery.Query
will execute a query on BigQuery that will be stored in Kestra’s internal storage (store: true
). It uses the Pebble expression {{taskrun.value}}
which retrieves the current value from the EachSequential
loop.
The write-csv task of type io.kestra.plugin.serdes.csv.CsvWriter
will rewrite the file stored by the previous task to the CSV format. By default, Kestra uses the Amazon Ion object storage format, so this task switches from the Ion to the CSV format. It uses the Pebble expression {{outputs['query-top-ten'][taskrun.value].uri}}
whose [taskrun.value].uri
attribute retrieves the value of the uri attribute for the current loop iteration.
After executing the flow, you can go to the Outputs tab to access the task outputs and, among other things, download the CSV files generated by the flow.
Trigger
By default, a flow can only be runned manually via the graphical interface or the Kestra API.
It is possible to trigger a flow from an external event, this is the role of the trigger.
Kestra includes three basic triggers: flow, which allows you to trigger a flow from another flow, webhook which allows you to trigger a flow from a webhook URL and schedule which allows you to trigger a flow periodically from a cron expression. Many other triggers are available within Kestra’s plugins and allow to trigger a flow from a message in a broker, a file, or the presence of a record in a database table for example.
The following example will trigger a flow every minute, it uses a cron expression to define its triggering periodicity.
id: flow-with-trigger
namespace: fr.loicmathieu.example
triggers:
- id: schedule
type: io.kestra.core.models.triggers.types.Schedule
cron: "*/1 * * * *"
tasks:
- id: "echo"
type: "io.kestra.core.tasks.debugs.Echo"
format: "{{task.id}} > {{taskrun.startDate}}"
Data processing with Python and Pandas
Kestra offers advanced functionality via tasks that allow you to execute Bash, Python or NodeJS scripts.
The following example will query the same BigQuery dataset of Wikipedia page views, write it to CSV format, and then use that CSV in a io.kestra.core.tasks.scripts.Python
task that allows you to run a Python script.
This task takes as properties:
inputFiles
: a file list that should contain themain.py
file that will be called by the task. A second filedata.csv
is defined that will allow local access of the file created by the task write-csv. Kestra will automatically retrieve it from its internal storage and make it available in the working directory of the Python task.requirements
: a list of pip dependencies, here we put the Pandas library that allows to analyze the data of the CSV file.
id: wikipedia-top-ten-pyhton-panda namespace: fr.loicmathieu.example description: A flow that loads wikipedia top 10 FR pages each hour tasks: - id: query-top-ten type: io.kestra.plugin.gcp.bigquery.Query serviceAccount: "<service account key>" sql: | SELECT DATETIME(datehour) as date, title, views FROM `bigquery-public-data.wikipedia.pageviews_2023` WHERE DATE(datehour) = current_date() and wiki = 'fr' and title not in ('Cookie_(informatique)', 'Wikipédia:Accueil_principal', 'Spécial:Recherche') ORDER BY datehour desc, views desc LIMIT 10 store: true - id: write-csv type: io.kestra.plugin.serdes.csv.CsvWriter from: "{{outputs['query-top-ten'].uri}}" - id: "python" type: io.kestra.core.tasks.scripts.Python inputFiles: data.csv: "{{outputs['write-csv'].uri}}" main.py: | import pandas as pd from kestra import Kestra data = pd.read_csv("data.csv") data.info() sumOfViews = data['views'].sum() Kestra.outputs({'sumOfViews': int(sumOfViews)}) requirements: - pandas
The Python script will use Pandas to read the CSV file and transform it into a Pandas data frame, then perform the sum of the views column. This sum will then be put into the task output using the Python Kestra library.
Here are the execution logs of this flow.
Conclusion
In this introductory article, we have seen the main concepts of Kestra and some flow examples.
To go further, you can check out the online documentation as well as the plugin list.
Ketra is an open source community project available on GitHub, feel free to:
- Put a star or open an issue on its repository.
- Follow Kestra on Twitter or Linkedin.
- Contact the team on Slack