Sitemap
Geek Culture

A new tech publication by Start it up (http://jeetwincasinos.com/swlh).

Airflow vs. Prefect vs. Kestra — Which is Best for Building Advanced Data Pipelines?

14 min readJul 24, 2023

--

Photo by on

AWS Configuration for S3 and Postgres Database

Image 1 — S3 bucket on AWS (image by author)
Image 2 — S3 bucket access key (image by author)
Image 3 — Provisioning a Postgres database in AWS RDS (image by author)
Image 4 — Port 5432 inbound rule (image by author)
CREATE TABLE users(
id INTEGER,
name VARCHAR(128),
email VARCHAR(256),
gender VARCHAR(32),
status VARCHAR(32),
inserted_from VARCHAR(32)
);
Image 5 — Table creation on Postgres (image by author)

Data Pipeline in Airflow

Connection Configuration

Image 6 — Airflow REST API connection (image by author)
Image 7 — Airflow S3 connection (image by author)
Image 8 — Airflow Postgres connection (image by author)

Writing the DAG in Python

pip install 'apache-airflow[amazon]'
pip install 'apache-airflow[postgres]'
import json
import pandas as pd
from datetime import datetime
from airflow.models import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.hooks.S3_hook import S3Hook
from airflow.hooks.postgres_hook import PostgresHook
from airflow.providers.postgres.operators.postgres import PostgresOperator


# Save to disk in JSON format
def save_users_locally(ti) -> None:
users = ti.xcom_pull(task_ids=["get_users"])
with open("/home/airflow/users.json", "w") as f:
json.dump(users, f)

# Save users to AWS S3
def save_users_to_s3(filename, key, bucket_name):
hook = S3Hook("s3_conn")
hook.load_file(filename, key, bucket_name)

# Save users to Postgres database
def save_users_to_pg():
with open("/home/airflow/users.json", "r") as f:
users = json.load(f)

users = pd.DataFrame(users)
users["inserted_from"] = "airflow"
hook = PostgresHook("postgres_db")
users.to_sql("users", hook.get_sqlalchemy_engine(), if_exists="append", index=False)



with DAG(
dag_id="airflow_pg_s3",
start_date=datetime(2023, 7, 17),
schedule_interval="@daily",
catchup=False
) as dag:
# 1. Get the data from a remote API
task_api_fetch = SimpleHttpOperator(
task_id="get_users",
http_conn_id="users_api",
endpoint="users/",
method="GET",
response_filter=lambda response: json.loads(response.text)
)

# 2. Save the file locally
task_save_locally = PythonOperator(
task_id="save_users_locally",
python_callable=save_users_locally
)

# 3. Push the raw file to S3
task_save_to_s3 = PythonOperator(
task_id="save_users_s3",
python_callable=save_users_to_s3,
op_kwargs={
"filename": "/home/airflow/users.json",
"key": f"users_{int(datetime.timestamp(datetime.now()))}.json",
"bucket_name": "demos3bucket-dr"
}
)

# 4. Push to a Postrgres database
task_save_to_pg = PythonOperator(
task_id="save_users_pg",
python_callable=save_users_to_pg
)


task_api_fetch >> task_save_locally >> task_save_to_s3 >> task_save_to_pg
Image 9 — Airflow DAG run (image by author)
Image 10 — File stored on S3 (image by author)
Image 11 — Users stored in database (image by author)

Data Pipeline in Prefect

Environment File Configuration

pip install python-dotenv
API_USERS=
AWS_KEY=
AWS_SECRET=
S3_BUCKET=
PG_HOST=
PG_PORT=
PG_USER=
PG_PASS=

Writing the Prefect Pipeline

pip install boto3
import os
import json
import boto3
import requests
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv
from sqlalchemy import create_engine
from prefect import task, flow
from prefect.deployments import Deployment

# Load environment variables file
load_dotenv()


# Get users from remote API
@task
def get_users(url: str):
req = requests.get(url=url)
res = req.json()
return res


# Save to disk
@task
def save_users_locally(users: dict, path: str):
with open(path, "w") as f:
json.dump(users, f)


# Save to an S3 bucket
@task
def save_users_to_s3(local_file_path: str, bucket_name: str, file_path: str):
session = boto3.Session(
aws_access_key_id=os.environ["AWS_KEY"],
aws_secret_access_key=os.environ["AWS_SECRET"]
)
s3_client = session.client("s3")
s3_client.upload_file(local_file_path, bucket_name, file_path)


# Save to Postgres database
@task
def save_users_to_pg(users):
engine = create_engine(f"postgresql://{os.environ['PG_USER']}:{os.environ['PG_PASS']}@{os.environ['PG_HOST']}:{os.environ['PG_PORT']}")
conn = engine.connect()

users = pd.DataFrame(users)
users["inserted_from"] = "prefect"
users.to_sql("users", conn, if_exists="append", index=False)


# Actual flow - calls individual tasks
@flow
def s3_pg_flow():
p_local_file = "users.json"
p_s3_file_name = f"users_{int(datetime.timestamp(datetime.now()))}.json"

users = get_users(os.environ["API_USERS"])
save_users_locally(users, p_local_file)
save_users_to_s3(p_local_file, os.environ["S3_BUCKET"], p_s3_file_name)
save_users_to_pg(users)



if __name__ == "__main__":
s3_pg_flow()
Image 12 — Running the Prefect flow (image by author)
Image 13 — Prefect flow results stored in S3 (image by author)
Image 14 — Prefect flow results stored in Postgres (image by author)

Deploying a Prefect Pipeline

...

# Deplpoy the flow so you can schedule it
def deployment():
deployment = Deployment.build_from_flow(
flow=s3_pg_flow,
name="s3_pg_deployment"
)
deployment.apply()


if __name__ == "__main__":
deployment()
Image 15 — Deployed Prefect pipeline (image by author)

Data Pipeline in Kestra

Writing the Kestra Flow

id: kesta-s3-pg
namespace: dev

tasks:
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: getUsers
type: io.kestra.plugin.scripts.python.Script
runner: DOCKER
docker:
image: python:3.11-slim
beforeCommands:
- pip install requests > /dev/null
warningOnStdErr: false
script: |
import json
import requests

URL = "http://gorest.co.in/public/v2/users"
req = requests.get(url=URL)
res = req.json()

with open("users.json", "w") as f:
json.dump(res, f)

- id: saveUsersLocally
type: io.kestra.core.tasks.storages.LocalFiles
outputs:
- users.json

- id: saveUsersS3
type: io.kestra.plugin.aws.s3.Upload
from: "{{outputs.saveUsersLocally.uris['users.json']}}"
key: users-kestra.json
bucket:
region:
accessKeyId:
secretKeyId:

- id: input
type: io.kestra.core.tasks.storages.LocalFiles
inputs:
data.users: "{{outputs.saveUsersLocally.uris['users.json']}}"

- id: saveUsersPg
type: io.kestra.plugin.scripts.python.Script
beforeCommands:
- pip install requests pandas psycopg2 sqlalchemy > /dev/null
warningOnStdErr: false
script: |
import json
import pandas as pd
import requests
from sqlalchemy import create_engine

with open("data.users", "r") as f:
users = json.load(f)

df_users = pd.DataFrame(users)
df_users['inserted_from'] = 'kestra'

engine = create_engine(
f"postgresql://<username>:<password>@<host>:<port>"
)

df_users.to_sql("users", engine, if_exists="append", index=False)
Image 16 — Running a Kestra flow (image by author)
Image 17 — Kestra flow results stored in S3 (image by author)
Image 18 — Kestra flow results stored in Postgres (image by author)

Simplifying the Kestra Flow

id: postgresS3PythonScript
namespace: dev

tasks:
- id: apiToPostgres
type: io.kestra.plugin.scripts.python.Script
beforeCommands:
- pip install requests pandas psycopg2 sqlalchemy > /dev/null
warningOnStdErr: false
script: |
import json
import pandas as pd
import requests
from sqlalchemy import create_engine

URL = "http://gorest.co.in/public/v2/users"
req = requests.get(url=URL)
res = req.json()

with open("{{outputDir}}/users.json", "w") as f:
json.dump(res, f)

df_users = pd.DataFrame(res)
df_users["inserted_from"] = "kestra"

engine = create_engine("postgresql://<username>:<password>@<host>:<port>")

df_users.to_sql("users", engine, if_exists="append", index=False)

- id: s3upload
type: io.kestra.plugin.aws.s3.Upload
from: "{{outputs.apiToPostgres.outputFiles['users.json']}}"
key: kestra-users.json
bucket:
region:
accessKeyId:
secretKeyId:
Image 19 — Simplifying the Kestra flow (image by author)

Scheduling a Kestra Flow

triggers:
- id: schedule
type: io.kestra.core.models.triggers.types.Schedule
cron: "0 0 * * *"
Image 20 — Scheduled Kestra flow (image by author)

The Verdict — Which Data Orchestration Platform is the Best for 2023 and Beyond?

Dario Radečić
Dario Radečić

Written by Dario Radečić

Senior Data Scientist & Tech Writer |

Responses (3)