Financial Intelligence Data Platform (Part 1) – Building ETL Pipeline from Scratch

Nowadays, we can see new AI products released on a weekly basis thanks to rapidly increasing of the foundation models. However, it needs a quality data infrastructure running under the hood. Unlike README.md in the GitHub repo, which focuses on cloning the projects, this blog post explains how to build the project from scratch as a guideline for anyone getting into Data Engineering in professional environments, as much as possible. So, let’s get started.

  1. Project Architecture
  2. Tech Stack, Prerequisites, and Project Structures
    1. Tech Stacks
    2. Prerequisites
    3. Project Structure
  3. Key Design Features
  4. How the project was built
    1. Setting up the project
      1. Create and initialize a PostgreSQL container with Docker
      2. Create and configure a virtual environment
      3. Save and secure your environment variables
    2. Pipeline walkthrough
      1. Extract
      2. Transform
      3. Modeling
      4. Load
      5. Orchestrate the Pipeline
    3. Testing the pipeline
    4. Querying the pipeline
  5. Conclusion

Project Architecture

At this point, this project focuses on building the ETL pipeline using the public API from AlphaVantage. With the extraction process, we will get the API response in the form of JSON, and we store it in a local machine accordingly. Then, transforming and load that data into a local PostgreSQL database. We also add ‘Modeling’ layer to add the new database table in case we created the new one.

Tech Stack, Prerequisites, and Project Structures

Tech Stacks

Amongst the choices of tools able to used for building, we build this ETL pipeline using Python and PostgreSQL combination, as they are popular tools amongst modern Data Engineers out there. Noted that we aren’t implementing fancy AI model stuffs at this point, as we’re focusing on building the ETL pipeline as a solid foundation. We will show you how to use those fancy tools later in the following projects.

LayerTools used
LanguagePython 3.9+
DatabasePostgreSQL 15
Database Driverpsycopg2-binary
Database ToolkitSQLAlchemy
Data Processing Librarypandas
HTTPrequests
Configurationpython-dotenv
ContainerizationDocker
Testingpytest

Prerequisites

  • Python 3.9 or later
  • Docker Desktop (for hosting and running PostgreSQL container)
  • AlphaVantageAPI (you could sign up for a free API key here)

AlphaVantage is a financial data services provider that also offers a public API to access their data. Not only OHLCV (Open, High, Low, Close, Volume) data, but also the financial reports, and other useful financial information for developers to work with.

With the free-tier API keys, we could make a request 5 times per minute and 25 requests daily. Our project was built considering these limitations. However, in the scale of production, we could purchase the premium API keys with more relaxed limitations later.

Project Structure

In this part, our finalized project directory would look like this. We will explain about each part of them in the following section of this blog.

finance_data_platform/
├── docker-compose.yml
├── requirements.txt
├── .env.example
├── sql/
│ └── sma.sql # Calculate SMA5 and SMA20
│ └── daily_returns.sql # Calculate daily returns
│ └── volatility.sql # Calculate volatility (standard deviation) of the returns from last 21 trading days
│ └── cumulative_return.sql # Calculate cumulative returns of the stocks from the earliest OHLCV data of stocks
├── data/
│ ├── raw/{symbol}/ # Raw JSON responses from AlphaVantage
│ └── analytics/ # Extracted files for SQL query
├── src/
│ ├── db_connect.py # psycopg2 connection factory (reads from .env)
│ ├── pipeline.py # Main ETL orchestration (fetch → transform → load)
│ ├── reprocess_pipeline.py # Load from local files without API calls
│ ├── analytics.py # Save the SQL query into .csv file
│ │
│ ├── extract/
│ │ └── alphavantage_ingest.py # API fetch with rate-limit retry and quota detection
│ │
│ ├── transform/
│ │ └── transform_stock.py # OHLCV + calendar attribute derivation; metadata parsing
│ │
│ ├── load/
│ │ ├── dimension_loader.py # Bulk-insert dim_date and dim_metadata (ON CONFLICT DO NOTHING)
│ │ └── fact_loader.py # Bulk-insert stock_prices; get_max_loaded_date for incremental load
│ │
│ └── modeling/
│ ├── create_dimension_tables.py
│ ├── create_fact_tables.py
│ └── create_indexes.py
└── tests/
├── conftest.py # db_cursor fixture — isolated TEMP tables per test
├── test_transform.py # Unit tests for transformation logic
└── test_loaders.py # Integration tests for all loaders (dim + fact)

Key Design Features

  • Batch ETL Pilelines: Fetch, Transform, and Load financial data
  • Star Schema: Dimensional modeling for analytical queries
  • Error Handling: API rate limit handling, database error rollback
  • Containerization: With both SQL container (run by Docker) and a virtual environment, this project could run on almost any machine
  • Testing: Unit tests for critical data transformation logic
  • Transaction Safety: Atomic operations with rollback on failure
  • Single transaction per symbol: both dimention and fact table shared a single connection commit, make it more safe for orphaned dimentions without corresponding facts.
  • Idempotent writes: all inserted statements use ON CONFLICT DO NOTHING, make it safe to re-run.
  • Incremental loading: get_max_loaded_date() prevents re-inserting existing OHLCV data into database, keeping loading efficient after the initial backfill.
  • Bulk inserts: when possible, we implement bulk loading process into the database significantly reduce processing time of loading row-by-row.
  • Test isolation: we design the testing mechanism to work with temporary tables to test the connection and ETL logics, isolate the testing data from the real ones.

How the project was built

Setting up the project

Create and initialize a PostgreSQL container with Docker

First, we need to create the PostgreSQL Docker container to run the SQL virtually without actually installing it into the local machine. No need to be concerned about the local machine’s dependencies.

To do so, we create the docker-compose.yml file first. As we need only PostgreSQL in the container, we need to define the container’s specification as follows:

docker-compose.yml
# define docker version
version: '3.8'
# define services used in container
services:
postgres:
image: postgres:15
container_name: finance_postgres
restart: always
environment:
POSTGRES_USER: finance_user
POSTGRES_PASSWORD: finance_password
POSTGRES_DB: finance_db
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:

After defining the container’s specification, we need to compose it as a container using the following command in the terminal.

PowerShell
docker compose up -d

Then, you could verify and run your compose using the following command. Make sure that your container name is visible after you verify it. Otherwise, you need to stop the other container that uses the same port (in this case, 5432 for PostgreSQL). Then compose the container again. Finally, run the container using these terminal command.

PowerShell
# verify the container
docker ps
# run the container
docker exec -it finance_postgres psql -U finance_user -d finance_db

If you do that correctly, the terminal will turn into PostgreSQL terminal, that starts with finance_db=#.

Create and configure a virtual environment

After we create the PostgreSQL container to make it run without concern of dependencies, we also need to make sure that our projects should be able to run on any machine, regardless of dependencies. To do so, we need to create VIrtual Environment for the project using the following commands.

PowerShell
# create venv folder
python -m venv venv
# for PowerShell user, we need to set the execution policy before activating the venv
Set-ExecutionPolicy -Scope CurrentUser -ExecutionPolicy Unrestricted -Force
# activate the venv, for PowerShell users, we use the followings
.\venv\Scripts\Activate.ps1

After we activate the virtual environment, we need to install the required Python libraries into the project using this command.

PowerShell
# install the required packages
pip install requests pandas psycopg2-binary python-dotenv pytest
# save the required packages as requirements
pip freeze > requirements.txt

Save and secure your environment variables

Practically, we don’t hardcode the confidential information, like API key and SQL database information, directly into the Python scripts, as it’s exposed to unauthorized usage. We need to save it into the files that would be never committed publicly. We called it the environment (.env) file. So, create the .env file, then input the following data into it.

.env
ALPHAVANTAGE_API_KEY=<alphavantage_api_key_here>
DB_HOST=localhost
DB_PORT=5432
DB_NAME=finance_db
DB_USER=finance_user
DB_PASSWORD=finance_password

If you’re gonna share this file publicly through a GitHub repository, make sure that you include .env file into the list of .gitignore files before committing them. Therefore, the .gitignore should at least include this item

PowerShell
.env

Pipeline walkthrough

Extract

In this point of this project, we’re going to extract..

  • Daily OHLCV data of the specified stocks
  • Metadata of stocks containing full company name, sector, industries, financial ratios, and so on.

To do so, we need to look up from the AlphaVantage API documentation to see how we could get that data. We found that we need to call for daily OHLCV data from “TIME_SERIES_DAILY” function, then get the Company’s metadata from “OVERVIEW” function.

As we’re using the free-tier API key, obviously, we also have the request limitations. At least we know that we have the limit of 5 API request per minute, and 25 requests per day. Moreover, we have the limited scope for OHLCV data as daily granularity for the last 100 days (outputsize=compact) only. So, we need to configure the fetching mechanism regarding these limitations

Here’s how the code looks like, for now:

src\extract\alphavantage_ingest.py
import requests
import os
import json
from datetime import datetime
import time
import logging
import pathlib
from dotenv import load_dotenv
# load environment variables from .env file to the codespace
load_dotenv()
# refer API key from .env file
API_KEY = os.getenv('ALPHAVANTAGE_API_KEY')
# define the logger object to inform the progress along the way
logger = logging.getLogger(__name__)
# create the target filepath (where the fetched files are saved into)
RAW_DATA_DIR = pathlib.Path(__file__).parent.parent.parent / "data" / "raw"
# fetch(extract) the OHLCV data
def fetch_stock_prices(symbol):
# to fetch the data, we need the querying URL
url = "https://www.alphavantage.co/query"
# and also the query itself. We need to use the query as dictionary as required from requests.get() method. noted that the dictionary keys must comply with API function parameters.
params = {
"function": "TIME_SERIES_DAILY", # function to call for daily OHLCV data
"symbol": symbol, # pass the symbol from method parameters to query parameters
"apikey": API_KEY, # refer the loaded API key
"outputsize": "compact" # define the output size
}
# fetch the data, if it uses more than 60 secs for a request, kill it.
response = requests.get(url, params=params, timeout=60)
# in case of other responses than 200 (HTTPError), raise it
response.raise_for_status()
# save the response as JSON
data = response.json()
# AlphaVantage returns the "Information" in case that the request wasn't fulfilled e.g. attempting intraday data with free-tier API keys
if "Information" in data:
raise RuntimeError(f"AlphaVantage daily quota exceeded for {symbol}: {data['Information']}")
# In case exceeding usage quota, AlphaVantage returns "Note" response. So we could wait and retry for a few times.
if "Note" in data:
for attempt in range(1, 4):
logger.warning(f"API rate limit reached for {symbol}. Attempt {attempt}/3 — waiting 60 seconds...")
time.sleep(60)
retry_response = requests.get(url, params=params, timeout=60)
retry_response.raise_for_status()
retry_data = retry_response.json()
if "Note" not in retry_data:
data = retry_data
break
else:
raise RuntimeError(f"API rate limit for {symbol} persisted after 3 retry attempts")
# In case that request is invalid or cannot be processed, return "Error Message"
if "Error Message" in data:
logger.error(f"API error for {symbol}: {data['Error Message']}")
raise RuntimeError(f"AlphaVantage API error for {symbol}: {data['Error Message']}")
# create the timestamp for OHLCV data
timestamp = datetime.utcnow().strftime("%Y-%m-%d")
# create new blank .json file into target directory. If it's not existed yet, create the new one
filepath = RAW_DATA_DIR / symbol / f"{symbol}_{timestamp}.json"
filepath.parent.mkdir(parents=True, exist_ok=True)
# write the extracted .json response into file, then save close it.
with open(filepath, "w") as f:
json.dump(data, f)
return str(filepath)
# fetch the company metadata
def fetch_company_metadata(symbol):
# define URL and query parameters as in OHLCV data
url = "https://www.alphavantage.co/query"
params = {
"function": "OVERVIEW",
"symbol": symbol,
"apikey": API_KEY
}
# fetch the response and store as data
response = requests.get(url, params=params, timeout=60)
response.raise_for_status()
data = response.json()
# Error Handling
if "Information" in data:
raise RuntimeError(f"AlphaVantage daily quota exceeded for {symbol} metadata: {data['Information']}")
if "Note" in data:
for attempt in range(1, 4):
logger.warning(f"API rate limit reached fetching metadata for {symbol}. Attempt {attempt}/3 — waiting 60 seconds...")
time.sleep(60)
retry_response = requests.get(url, params=params, timeout=60)
retry_response.raise_for_status()
retry_data = retry_response.json()
if "Note" not in retry_data:
data = retry_data
break
else:
raise RuntimeError(f"API rate limit for {symbol} metadata persisted after 3 retry attempts")
if "Error Message" in data:
raise RuntimeError(f"AlphaVantage API error for {symbol} metadata: {data['Error Message']}")
# create raw .json file
filepath = RAW_DATA_DIR / symbol / f"{symbol}_metadata.json"
filepath.parent.mkdir(parents=True, exist_ok=True)
# write the response into .json file, then close
with open(filepath, "w") as f:
json.dump(data, f)
return str(filepath)

Transform

In transformation phase, we read the saved .json file, select the columns we need, create the new ones with calculations, and typecasting to make it comply with SQL data types. Like in the extract phase, we make a transformation on (1) OHLCV data, and (2) the Company’s metadata. We use the following instructions.

src\transform\transform_stock.py
import json
import pandas as pd
# Transform OHLCV data
def transform_stock_prices(filepath, symbol):
# read the extracted JSON file
with open (filepath, "r") as f:
data = json.load(f)
# get the dictionary values (the daily OHLCV data) from the "Time Series (Daily)" key in the JSON data >> refer to API documentation for other APIs
time_series = data.get("Time Series (Daily)", {}) # {} means if the key is not found, it will return an empty dictionary instead of throwing an error
# create the blank lists, we will append it in the following for loop
records = []
for date, values in time_series.items():
# convert the date string to datetime object, we made it here to reduce the computations overhead than converting it in the loop
dt = pd.to_datetime(date)
# select the symbol, date, OHLCV data, then create the new date columns built from date
records.append({
"symbol": symbol, # pass the argument symbol to the function and include it in the records
"date": dt, # convert the date string to a datetime object
"open": float(values["1. open"]),
"high": float(values["2. high"]),
"low": float(values["3. low"]),
"close": float(values["4. close"]),
"volume": int(values["5. volume"]),
"day" : dt.day,
"month" : dt.month,
"year" : dt.year,
"quarter" : ((dt.month - 1) // 3) + 1, # ensure the quarter is classified correctly
"day_of_week" : dt.dayofweek, # Monday=0, Sunday=6
"week_of_year" : int(dt.isocalendar().week) # typecasting to int to avoid pandas Int64 type which is not compatible with psycopg2 when loading into the database
})
# convert the list into the dataframe
df = pd.DataFrame(records)
return df
# Transform company's metadata
def transform_company_metadata(filepath):
with open(filepath, "r") as f:
data = json.load(f)
# get the dicionary values
symbol = data.get("Symbol", "")
if not symbol:
raise ValueError(f"Metadata file is missing 'Symbol' field. The file may contain an API error response.")
# this time, we store it as dictionary
metadata = {
"symbol": symbol,
"company_name": data.get("Name", ""),
"sector": data.get("Sector", "")
}
return metadata

You could see that we return the DataFrame for OHLCV data, but just a Python dictionary for company metadata. The reason behind them is that the OHLCV data is the multi-row tabular data, while metadata source file is just a single row data.

Modeling

Basically, the modeling process is the layer to monitor and create tables in the SQL database. It might not be required in building a classic ETL pipeline, as we could build it right into your SQL database only once, but adding the modeling layer has its own benefits, such as:

  1. Reproducibility across environments: In case we need to run this project on the new machine e.g. Teammate’s laptop, Docker container, or CI/CD server — the modeling layer rebuilds the schema automatically without creating the table manually in SQL terminal.
  2. Version control & auditability: When we made changes on a column through this layer, the change would be visible across Git commits. Leave the audit trail for future inspection.
  3. Orchestration: As the table was wrapped as a Python method, it’s also callable through our orchestration scripts (pipeline.py). We place it before the loading process to ensure that the transformed data has the table in SQL database to be loaded into. Otherwise, we need to create them first.
  4. Safe to run repeatedly: under the proper query, using CREATE TABLE IF NOT EXISTS clause ensures that the new table would not be created unreasonably.

At t his phase of this project, we only need to create 3 SQL tables.

  • OHLCV data as the fact tables
  • Dates and Metadata as the dimension tables

How’s the fact table models looks like.

src\modeling\create_fact_tables.py
# borrow the connection config from db_connect.py
from db_connect import db_connect
def create_fact_table():
# open the SQL connection
conn = db_connect()
# create a cursor object to interact with the database
cursor = conn.cursor()
# check the SQL database if they have SQL tables. If not, create the new one.
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS stock_prices (
symbol TEXT,
date DATE,
open NUMERIC,
high NUMERIC,
low NUMERIC,
close NUMERIC,
volume BIGINT,
PRIMARY KEY (symbol, date),
FOREIGN KEY (symbol) REFERENCES dim_metadata(symbol),
FOREIGN KEY (date) REFERENCES dim_date(date)
);
"""
)
# send the query string to SQL
conn.commit()
# close both cursor object and connection
cursor.close()
conn.close()
# entry point guard to regulate execution sequence in the pipeline
if __name__ == "__main__":
create_fact_table()

And here’s how dimension table looks like.

src\modeling\create_dimension_tables.py
from db_connect import db_connect
def create_dim_dates():
# open the SQL connection, it requires host, port, database_name, username, and password
conn = db_connect()
# create a cursor object to interact with the database
cursor = conn.cursor()
# create the dim_dates table if not exists, otherwise do nothing
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS dim_date (
date DATE PRIMARY KEY,
day INT,
month INT,
year INT,
quarter INT,
day_of_week INT,
week_of_year INT
);
"""
)
conn.commit() # commit the transaction to save changes
cursor.close() # close the cursor
conn.close() # close the connection
def create_dim_metadata():
conn = db_connect()
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS dim_metadata (
symbol TEXT PRIMARY KEY,
company_name TEXT,
sector TEXT
);
"""
)
conn.commit() #
cursor.close()
conn.close()
# entry point guard
if __name__ == "__main__":
create_dim_dates()
create_dim_metadata()

Load

After we ensure that the fetched data is cleaned into the format we need (transform) and we have the proper table to load it into (modeling), the next step is to load the transformed data into the modeled SQL tables.

Here we also have the fact loader for OHLCV data and dimension data for dates and metadata, like the prior steps.

Before loading data right into SQL, we need to ensure that the loaded data is not duplicated in the existing SQL tables. In this case, we are talking about OHLCV data, which is designed to fetch the historical 100 days of data, so the data inside the SQL table would be 100x redundant without proper regulation.

Here’s how we created the loading logic for fact tables. Noted that we use get_max_loaded_date() method to find out the latest loaded date of each stock into the database. When we get those dates, we will compare the transformed data to find the incremental data that needs to be loaded. You’ll see how it works in the orchestration phase of this pipeline.

src\load\fact_loader.py
from psycopg2.extras import execute_values
import logging
logger = logging.getLogger(__name__)
# open the connection to the PostgreSQL database using credentials from environment variables
def load_stock_prices(cursor, df):
# insert data into stock_prices table as a bulk
# Convert all numpy types to Python native types to avoid psycopg2 adapter errors
fact_columns = ["symbol", "date", "open", "high", "low", "close", "volume"] # select the columns
df_copy = df[fact_columns].copy() # create a copy with selected columns to avoid modifying the original DataFrame
df_copy['date'] = df_copy['date'].dt.to_pydatetime() # numpy.datetime64 -> datetime
# Use .values.tolist() to convert numpy types to native Python types
values = [tuple(row) for row in df_copy[fact_columns].values.tolist()]
# write the query to load the transformed values to SQL table
insert_query = """
INSERT INTO stock_prices (symbol, date, open, high, low, close, volume)
VALUES %s
ON CONFLICT (symbol, date) DO NOTHING
"""
# load the value into table as a bulk
execute_values(cursor, insert_query, values)
logger.info(f"Loaded {len(values)} new rows into stock_prices")
def get_max_loaded_date(cursor, symbol):
"""
Check for the latest date loaded for each symbol in fact table.
"""
cursor.execute(
"SELECT MAX(date) FROM stock_prices WHERE symbol = %s;",
(symbol,)
)
# fetchone() returns a tuple, we want the first element which is the max date
result = cursor.fetchone()
max_date = result[0]
logger.info(f"Latest loaded date for {symbol}: {max_date}")
return max_date

And this one for dimension tables:

src\load\dimension_loader.py
from psycopg2.extras import execute_values
import logging
# create logging object to return the execution log to user
logger = logging.getLogger(__name__)
def load_dim_dates(cursor, df):
"""
Load the date dimensions in bulk,
reduce the load time significantly than the iteration row-by-row
"""
date_cols = ['date', 'day', 'month', 'year', 'quarter', 'day_of_week', 'week_of_year']
df = df[date_cols].drop_duplicates()
values = [tuple(row) for row in df.values.tolist()] # convert each low to the list of tuples so it could be loaded as a bulk
insert_query = """
INSERT INTO dim_date (date, day, month, year, quarter, day_of_week, week_of_year)
values %s
ON CONFLICT (date) DO NOTHING;
"""
# load values as a bulk into SQL tables
execute_values(cursor, insert_query,values)
logger.info(f"Loaded {len(values)} rows into dim_date")
def load_dim_metadata(cursor, metadata):
# insert one row (metadata) into SQL metadata table
cursor.execute(
"""
INSERT INTO dim_metadata (symbol, company_name, sector)
VALUES (%s, %s, %s)
ON CONFLICT (symbol) DO NOTHING;
""",
(metadata["symbol"], metadata["company_name"], metadata["sector"])
)
logger.info(f"Loaded metadata for {metadata['symbol']} into dim_metadata")

Orchestrate the Pipeline

Finally, we will orchestrate those created methods to make them works together in the defined sequences. Here’s how we designed the orchestration processes:

  • Create the fact and dimension table for OHLCV and metadata, respectively.

Then, for each selected stocks, we orchestrate ETL pipeline with the following process.

  • Fetch OHLCV and metadata from AlphaVantage with free-tier API. We let the code sleep for 12 secs after each API request to make it comply with the minute-wise free-tier requests (5 requests per minute)
  • Transform OHLCV and metadata into the desired form
  • Look up the latest data we have in the SQL table, create a filter to load only the data point occurred from that date.
  • Load both OHLCV and metadata of the stock into SQL table.

For each process, we mount the logging process to leave a trail for users when something goes wrong.

Here’s how the orchestration process looks like:

src\pipeline.py
import logging
import time
from db_connect import db_connect
from extract.alphavantage_ingest import fetch_stock_prices, fetch_company_metadata
from transform.transform_stock import transform_stock_prices, transform_company_metadata
from load.fact_loader import load_stock_prices, get_max_loaded_date
from load.dimension_loader import load_dim_dates, load_dim_metadata
from modeling.create_dimension_tables import create_dim_dates, create_dim_metadata
from modeling.create_fact_tables import create_fact_table
from modeling.create_indexes import create_indexes
# config the logging to display info level messages with timestamps
logging.basicConfig(
level=logging.INFO,
format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt = '%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
def run(symbols):
# build up the connection with the borrowed scripts from cb_connect.py
conn = db_connect()
cursor = conn.cursor() # create a cursor object to interact with the database
for symbol in symbols:
try:
# fetch the data from the API, we use 12 secs sleep after each API call to respect free tier limit (5 calls per minute)
logger.info(f"Fetching {symbol}")
filepath = fetch_stock_prices(symbol)
time.sleep(12)
metadata_filepath = fetch_company_metadata(symbol)
time.sleep(12)
# transform the data and metadata
logger.info("Transforming")
df = transform_stock_prices(filepath, symbol)
metadata = transform_company_metadata(metadata_filepath)
# add the incremental loading logic here by checking the max date already loaded for the symbol in the fact table
max_date = get_max_loaded_date(cursor, symbol)
if max_date is not None:
df = df[df["date"].dt.date > max_date] # filter the dataframe to only include rows with date greater than max_date
logger.info(f"{symbol}: {len(df)} new rows after {max_date}")
if df.empty:
logger.info(f"{symbol}: no new data to load, skipping")
continue
# load the dimension tables first, as the fact table has foreign key associated to them
logger.info("Populating dimension tables")
load_dim_dates(cursor, df)
load_dim_metadata(cursor, metadata)
# load the fact table with transformed data
logger.info("Loading into fact table")
load_stock_prices(cursor, df)
conn.commit()
# in case anything goes wrong for each stocks, we cancelled out the commit for that stock, then raise an error
except Exception as e:
conn.rollback()
logger.error(f"Pipeline failed: {e}")
raise
# Orchestrate all processes together
if __name__ == "__main__":
symbols = ["NVDA","AAPL","MSFT","GOOGL","AMZN"]
create_dim_dates() # ensure the dim_dates table is created before loading data
create_dim_metadata() # ensure the dim_metadata table is created before loading data
create_fact_table() # ensure the fact table is created before loading data
create_indexes() # create indexes on the fact table for performance
run(symbols)

Testing the pipeline

There’s no proper ETL pipeline without proper testing. So, we create testing package for this project. This is how we do it:

After we create a /tests folder, we create __init__.py which is basically the blank file. Doing so lets the machine treat the /tests folder as a package, which could be managed and run with more ease and resilience. Even though it isn’t required in modern pytest (ver 3.0+) anymore, but __init__.py is still required for pytest.ini or pyproject.toml, which are the common workflow configurations in modern days.

Then, we set the conftest.py to configure the test scope (in this case, integration test), and store the test fixture with the decorator @pytest.fixture for further integration tests. Here’s how it looks like:

tests\conftest.py
import pytest
import psycopg2
from src.db_connect import db_connect
# scope the integration test. We will mention it in the integration test, and we could also scope only the integration test later in the terminal.
def pytest_configure(config):
config.addinivalue_line(
"markers", "integration: mark a test as an integration test that requiring a live PostgreSQL connection (docker compose up -d)"
)
# set the figure, we would use it now and later when the test loaders need the table
@pytest.fixture(scope="function")
def db_cursor():
"""
Connect to the real Postgres instance and create TEMP tables
, which mimics the production schema. Each test gets a clean slate;
connection is rolled back and closed after the test regardless of pass/fail.
Skip automatically if Postgres is not reachable.
"""
try:
conn = db_connect()
except psycopg2.OperationalError:
pytest.skip("PostgreSQL is not reachable, compose the docker container first via: docker compose up -d")
conn.autocommit = False
cursor = conn.cursor()
"""
TEMP tables are session-scoped and don't support cross-table foreign keys (FK) constraints,
so they are defined without them here. They are auto-dropped when the connection is closed .
"""
# create temp date dimension table
cursor.execute(
"""
CREATE TEMP TABLE dim_date (
date DATE PRIMARY KEY,
day INT,
month INT,
year INT,
quarter INT,
day_of_week INT,
week_of_year INT
);
"""
)
# create temp metadata dimension table
cursor.execute(
"""
CREATE TEMP TABLE dim_metadata (
symbol TEXT PRIMARY KEY,
company_name TEXT,
sector TEXT
);
"""
)
# create temp fact table
cursor.execute(
"""
CREATE TEMP TABLE stock_prices (
symbol TEXT,
date DATE,
open NUMERIC,
high NUMERIC,
low NUMERIC,
close NUMERIC,
volume BIGINT,
PRIMARY KEY (symbol, date)
);
"""
)
yield cursor # yield the cursor to the test function. This was used to separate the setup and teardown logic, which is a best practice for test fixtures.
conn.rollback() # rollback any changes to ensure test isolation
cursor.close()
conn.close()

Then, we’ll conduct the test on the transform and load process in test_transform.py and test_loaders.py, respectively. You could see the full tests in this GitHub repository, but we could show you briefly how it works.

In transformation testing, we build up the class for each transformation component (OHLCV, metadata), then the edge cases. We define the mockup row as a fixture, then process it through actual transformation function and assure the results as we need. Here’s how it looks like, for example:

tests\test_transform.py
import pytest
import pandas as pd
from datetime import datetime
import json
import tempfile
import os
from src.transform.transform_stock import transform_stock_prices, transform_company_metadata
class TestTransformStockPrices:
...
class TestTransformCompanyMetadata:
@pytest.fixture
def sample_metadata_response(self):
"""
Create a mock metadata API response to interact with the following testing methods.
"""
return {
"Symbol": "AAPL",
"Name": "Apple Inc.",
"Sector": "Technology",
"Industry": "Consumer Electronics"
}
def test_transform_metadata_returns_dict(self, sample_metadata_response):
"""
Ensure that transformed data is a dictionary.
"""
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(sample_metadata_response, f)
temp_filepath = f.name
try:
result = transform_company_metadata(temp_filepath)
assert isinstance(result, dict)
finally:
os.unlink(temp_filepath)

As the transformation phase doesn’t require communication with containerized SQL, we could call it the ‘unit test’. The next process is testing on the load phase, which needs to communicate with Docker (PostgreSQL), as we need to ensure the data migration process from local Python codespaces to SQL database is working smoothly, we also call this the ‘integration test’ and that’s what the configuration in conftest.py comes in handy here. We use the similar structure to conduct the test, but we have to use the decorator @pytest.mark.integration over the class to declare that it’s the integration tests. Here’s how it looks like for example:

tests\test_loaders.py
import pytest
import pandas as pd
from datetime import date
from src.load.fact_loader import load_stock_prices, get_max_loaded_date
from src.looad.dimension_loader import load_dim_dates, load_dim_metadata
# define the helper functions for further testing as show below
def _dim_date_row(date_str):
dt = pd.Timestamp(date_str)
return {
"date": dt,
"day": dt.day,
"month": dt.month,
"year": dt.year,
"quarter": ((dt.month - 1) // 3) + 1,
"day_of_week": dt.dayofweek,
"week_of_year": int(dt.isocalendar().week)
}
def _price_row(symbol, date_str, opening=148.0, closing=150.0):
dt = pd.Timestamp(date_str)
return {
"symbol": symbol,
"date": dt,
"open": opening,
"high": closing + 1,
"low": opening - 1,
"close": closing,
"volume": 25000000,
"day": dt.day,
"month": dt.month,
"year": dt.year,
"quarter": ((dt.month - 1) // 3) + 1,
"day_of_week": dt.dayofweek,
"week_of_year": int(dt.isocalendar().week)
}
# create the class of testing methods
@pytest.mark.integration
class TestLoadDimDates:
...
@pytest.mark.integration
class TestLoadStockPrices:
def _seed(self, db_cursor, dates=("2026-03-09", "2026-03-08"), symbol="AAPL"):
"""
Helper method to seed the dim_date and dim_metadata tables with the necessary data for testing the fact loader.
"""
dim_df = pd.DataFrame([_dim_date_row(d) for d in dates])
load_dim_dates(db_cursor, dim_df)
load_dim_metadata(db_cursor, {"symbol": symbol, "company_name": "Apple Inc.", "sector": "TECHNOLOGY"})
def test_inserts_rows(self, db_cursor):
"""
Given the blank stock_prices fact table,
when we load a stock price record,
then it should be inserted into the table in just one row.
"""
self._seed(db_cursor)
df = pd.DataFrame([_price_row("AAPL", "2026-03-09"), _price_row("AAPL", "2026-03-08")])
load_stock_prices(db_cursor, df)
db_cursor.execute("SELECT COUNT(*) FROM stock_prices;")
assert db_cursor.fetchone()[0] == 2

The integration tests work with the temporary table as defined in conftest.py then use the actual loaders method to load the mockup data into the temporary table, then drop those tables safely to ensure that this mockup data wouldn’t loaded into the production tables.

Finally, the full flow of pytest could be illustrated as follows:

pytest
├── discovers tests/conftest.py → registers "integration" marker
├── test_transform.py (unit tests)
│ └── no db_cursor → runs immediately, no Docker needed
└── test_loaders.py (integration tests)
└── requests db_cursor fixture
├── tries db_connect()
├── if Docker is down → pytest.skip()
└── if Docker is up → creates TEMP tables → yields cursor → test runs → rollback → cleanup

Then, we run the test using the following command:

PowerShell
# use this one to run everything
pytest tests/ -v
# use this one to only run on the integration tests
# make sure the docker PostgreSQL container is running. Otherwise run docker compose up -d
pytest tests/ -v -m integration

Querying the pipeline

After we pass all tests, we can ensure that the ETL pipelines work properly from API fetching to loading into SQL. In the next phase, we would use SQL queries to extract the loaded data into .csv files.

First, we build up the SQL queries under the folder /sql. Each .sql file contains a different query for maintainability purposes. For example, we calculate the daily return using the following query:

sql\daily_returns.sql
SELECT
s.symbol,
s.date,
s.close,
LAG(s.close) OVER (
PARTITION BY s.symbol
ORDER BY s.date ASC
) AS prev_close,
ROUND (
(s.close / LAG(s.close) OVER ( PARTITION BY s.symbol ORDER BY s.date ASC)) - 1, 4
) AS daily_return
FROM stock_prices s
ORDER BY s.symbol, s.date ASC;

When we create all queries (we have 4 queries at this point — SMA, daily returns, volatility, and cumulative returns), we will orchestrate the query process which have the following steps:

Create Engine with SQL connection string → Lookup the queries → Create output directory → Read the query and export the results → Dispose the engine

We orchestrate the querying process in /src/analytics.py as detailed as follows:

src\analytics.py
import pandas as pd
from pathlib import Path
from db_connect import db_connect
import logging
from sqlalchemy import create_engine
from dotenv import load_dotenv
import os
load_dotenv()
logger = logging.getLogger(__name__)
# create SQLAlchemy engine for database connection using connection string
engine = create_engine(
f"postgresql+psycopg2://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"
)
# lookup the queries from /sql
QUERIES = {
"sma": Path("sql/sma.sql"),
"daily_returns": Path("sql/daily_returns.sql"),
"volatility": Path("sql/volatility.sql"),
"cumulative_return": Path("sql/cumulative_return.sql")
}
# create output directory to save the analytics results
OUTPUT_DIR = Path("data/analytics")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# connect to the database and execute each query, saving the results as CSV files
conn = db_connect()
# execute each query and save the results to CSV files
for name, sql_path in QUERIES.items():
query = sql_path.read_text()
df = pd.read_sql_query(query, conn)
output = OUTPUT_DIR / f"{name}.csv"
df.to_csv(output, index=False)
logger.info(f"Exported {name} to {output}")
# close the database connection
engine.dispose()

Conclusion

At this point, we create the classic ETL pipeline, starting from fetching data from external sources with public API keys, transforming the data into the format we need, and then loading it into a local SQL database. Moreover, we create and manage the SQL schema through the modeling phase for reproducibility across environments and version control. After that, we conduct the testing procedures to ensure that the transformation unit and database integration work properly. After that, we create the analytics layer to fetch data from the SQL database, do some calculations through SQL queries, and then save the results into .csv files. To make this project executable across environments, we built this project under virtual environments and a containerized SQL database.

In the next part, we will move forward from building the local ETL pipeline to cloud using AWS services. Stay tuned.

Leave a comment

“Hello World :)”

Welcome to my personal portfolio site. Here you can find project showcases and articles related to various field of data sciences e.g. Data Engineering, Data Analytics, Machine Learning, and something in between.

I’m trying to keep this site updated constantly so I hope you guys enjoy my journey into the world of data together.

:)
Asa.

Subscribe to our site here.

Meet me at