Sitemap
Geek Culture

A new tech publication by Start it up (http://jeetwincasinos.com/swlh).

Introducing Kestra: Finally a Viable Airflow Alternative?

12 min readJun 24, 2023

--

Photo by on

What is Kestra and Why Should You Care?

How to Install Kestra

curl -o docker-compose.yml http://raw.githubusercontent.com/kestra-io/kestra/develop/docker-compose.yml
Image 1 — Kestra docker-compose file (image by author)
docker-compose up -d
Image 2 — Starting Kestra (image by author)
Image 3 — Kestra UI (image by author)

How to Create Your First Kestra Flow

Image 4 — Your Kestra flows (image by author)
id: first-flow
namespace: dev
inputs:
- name: firstname
type: STRING
defaults: User
required: false
tasks:
- id: hello-task
type: io.kestra.core.tasks.log.Log
message: Hello, {{ inputs.firstname }}
Image 5 — Writing the flow YML file (image by author)
Image 6 — Documentation split screen view (image by author)
Image 7 — Saving the flow (image by author)

Run Your First Kestra Flow

Image 8 — Running the flow (image by author)
Image 9 — Flow execution Gantt view (image by author)
Image 10 — Flow execution Log view (image by author)

Python Tasks in Kestra Flows — How to Get Started

The Logic Behind Our Simple Data Pipeline

Image 11 — Contents of the JSONplaceholder free API (image by author)

Writing Python Code in a YML File

id: python-task
namespace: dev
tasks:
- id: downloadData
type: io.kestra.plugin.fs.http.Download
uri: http://jsonplaceholder.typicode.com/users

- id: processData
type: io.kestra.core.tasks.scripts.Python
outputFiles:
- usersCsv
inputFiles:
data.json: "{{outputs.downloadData.uri}}"
main.py: |
import json
import pandas as pd
from kestra import Kestra

# Read JSON file
with open("data.json", "r") as f:
data = json.load(f)

# Keep certain attributes
df_src = []
for r in data:
df_src.append({
"id": r["id"],
"name": r["name"],
"email": r["email"],
"address": f"{r['address']['city']} - {r['address']['street']}, {r['address']['suite']}",
"phone": r["phone"],
"website": r["website"]
})

# Convert to pd.DataFrame and save
df = pd.DataFrame(df_src)
df.to_csv("{{outputFiles.usersCsv}}", index=False)
runner: DOCKER
dockerOptions:
image: ghcr.io/kestra-io/pydata:latest

- id: printData
type: io.kestra.core.tasks.scripts.Bash
inputFiles:
data.csv: "{{outputs.processData.files.usersCsv}}"
commands:
- cat data.csv
Image 12 — Contents of a Python task flow (image by author)

Running the Kestra Flow with a Python Task

Image 13 — Workflow execution Gantt view (image by author)
Image 14 — Workflow execution Log view (image by author)

How to Schedule Kestra Flows with Cron

triggers:
- id: schedule
type: io.kestra.core.models.triggers.types.Schedule
cron: 0 * * * *
id: python-task
namespace: dev
tasks:
- id: downloadData
type: io.kestra.plugin.fs.http.Download
uri: http://jsonplaceholder.typicode.com/users

- id: processData
type: io.kestra.core.tasks.scripts.Python
outputFiles:
- usersCsv
inputFiles:
data.json: "{{outputs.downloadData.uri}}"
main.py: |
import json
import pandas as pd
from kestra import Kestra

# Read JSON file
with open("data.json", "r") as f:
data = json.load(f)

# Keep certain attributes
df_src = []
for r in data:
df_src.append({
"id": r["id"],
"name": r["name"],
"email": r["email"],
"address": f"{r['address']['city']} - {r['address']['street']}, {r['address']['suite']}",
"phone": r["phone"],
"website": r["website"]
})

# Convert to pd.DataFrame and save
df = pd.DataFrame(df_src)
df.to_csv("{{outputFiles.usersCsv}}", index=False)
runner: DOCKER
dockerOptions:
image: ghcr.io/kestra-io/pydata:latest

- id: printData
type: io.kestra.core.tasks.scripts.Bash
inputFiles:
data.csv: "{{outputs.processData.files.usersCsv}}"
commands:
- cat data.csv
triggers:
- id: schedule
type: io.kestra.core.models.triggers.types.Schedule
cron: 0 * * * *
Image 15 — Adding a schedule trigger (image by author)
Image 16 — Successful run of a scheduled flow (image by author)

Kestra Impressions — Pros and Cons

Dario Radečić
Dario Radečić

Written by Dario Radečić

Senior Data Scientist & Tech Writer |