March 17, 2025
Kyle Kaniecki
AI hype is everywhere in the tech landscape today, and there are many rumors and falsehoods about it's value. It feels like everybody from large corporations to aspiring startups are lunging at the opportunity to insert AI into their tools and products to capture their share of the tech angel investing budget.
Personally, I think that AI is a crucial tool, but like any tool, it has its good use cases and then use cases that it just doesn't excel at. Maybe this statement will age like milk in the future, but I'm fairly confident that AI in March 2025 is a glorified text generation tool. However, like any tool, I do realize that my own bias comes from the bad anecdotes I've seen in person and read online. I don't actually understand how AI works behind the scenes, I just see the final "products," even if they aren't necessarily applied to the correct or appropriate use cases. So, I set aside some time this spring to truly sit down with a good friend of mine and genuine learn what AI has to offer and how it works at a lower level.
But first, let's check out some final graphs showing the data + context I used to keep everyone engaged. This same context eventually is fed into the ML training loop and used to train the AI.
As I am sure many of you reading this have read, a big part of machine learning + AI training is the dataset it is trained on. Having an extremely clean, broad, and well labelled data set is crucial to making sure your algorithms pick up on the best context through training.
So, for my first AI training exercise, I wanted to use a dataset that is very publicly available, easy to validate against multiple sources, and easy to correlate across other domains so I could attempt to add my own context to the training that I felt would be important. At the end of the day, I landed on Stock Market candle data, where I could download 20+ years of very accurate historical relatively cheaply from various sources. The sources I ended up using were:
Once I had my sources, I needed to figure out a way to best store, sort, and query the data. Since I was already running Clickhouse on my homelab cluster, I knew it would be a good choice for fast + efficient storage choice for all the data I would need to build my custom AI dataset, along with allowing me to access the data easily from any machine. I started with creating a database for each of the data sources on the cluster.
CREATE DATABASE IF NOT EXISTS polygon ON cluster 'your_cluster_here';
CREATE DATABASE IF NOT EXISTS tiingo ON cluster 'your_cluster_here';
CREATE DATABASE IF NOT EXISTS alpaca ON cluster 'your_cluster_here';
CREATE DATABASE IF NOT EXISTS google ON cluster 'your_cluster_here';
Once the databases were created on all instances of my clickhouse cluster, I went ahead and created the tables necessary to store the raw candle data from our various data sources. Since I was only collecting this data from two sources, I only needed to create them in the alpaca + polygon data sources. These tables are identical across the databases, but I wanted to make sure each data source remained unpolluted from the others while allowing my algorithms to pull from each without change, so repetition here is actually encouraged.
-- Create the polygon minute aggregate data table
CREATE TABLE IF NOT EXISTS polygon.minute_aggs_v1 on cluster 'your_cluster_here'
(
ts DateTime64(3),
symbol LowCardinality(String),
open Float32,
close Float32,
high Float32,
low Float32,
volume UInt32,
transactions UInt32
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}')
ORDER BY (symbol, ts);
-- Create the alpaca markets minute aggregate data table
CREATE TABLE IF NOT EXISTS alpaca.minute_aggs_v1 ON cluster 'your_cluster_here'
(
ts DateTime64(3),
symbol LowCardinality(String),
open Float32,
close Float32,
high Float32,
low Float32,
volume Float32,
transactions UInt32
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}')
ORDER BY (symbol, ts);
Once the tables for the raw data were collected, it was time for us to actually fill the tables with data. To do this, I grabbed my API keys from both Polygon and Alpaca Markets and started to write code.
In order to collect the data in the same fashion from all sources to rule out any data collection bugs, I decided to write all the data fetchers in python. Polygon has convenient gzipped flat files stored in an s3-like public endpoint that I could grab. This worked great, and it was where I started.
Also, to make fetching the data easy and to allow me to fetch only specific date ranges and edit how the data was being fetched, I decided to write the majority of my tools behind Click commands. Click is an amazing utility that allows me to modify execution of my scripts without changing much about the structure of the repo itself or encouraging constants floating everywhere.
An example of the click command that fetches polygon flat files, decompressed the gzips in memory, and insert them in bulk into clickhouse looks a little like this:
@click.group(name="polygon")
def polygon_cmd():
pass
@polygon_cmd.command()
@click.option(
"--bucket",
default="flatfiles",
help="The name of the bucket to scan for aggregate data",
)
@click.option("--workers", "-w", default=WORKER_COUNT)
@click.option(
"--start",
default=None,
help="The start date to filter news results from, formatted as YYYY-MM-DD",
)
@click.option(
"--end",
default=None,
help="The end date to filter news results to, formatted as YYYY-MM-DD",
)
def fetch_minute_aggregate_data(
bucket: str, workers: int, start: str | None, end: str | None
):
prefix = "us_stocks_sip"
bucket_keys_by_date = fetch_s3_file_data(
settings.polygon.aws_access_key_id,
settings.polygon.aws_secret_access_key,
bucket,
prefix,
)
start_dt = dateutil.parser.isoparse(start) if start is not None else None
end_dt = dateutil.parser.isoparse(end) if end is not None else None
total_rows_processed = 0
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
futures = []
for date, data in bucket_keys_by_date.items():
dt = dateutil.parser.isoparse(date)
if start_dt is not None and dt < start_dt:
continue
if end_dt is not None and dt > end_dt:
continue
for datum in data:
futures.append(
executor.submit(
process_key,
settings.polygon.aws_access_key_id,
settings.polygon.aws_secret_access_key,
bucket,
datum["bucket_key"],
)
)
for future in concurrent.futures.as_completed(futures):
future.result()
print(f"Added {total_rows_processed} `1 minute` candles to the dataset")
Polygon FlatFile Fetcher
Some implementation details, function definitions, and variable declarations are missing from this small snippet, but the overall message should still remain. I used AWS's boto3 library to parse the polygon flatfile bucket and put each of the keys in a dict, who's keys were dates that were easily parsable by standard python date parsing libraries.
Once I had the bucket keys organized, I could use that map to send fetch, decompress, and insert jobs to a process pool. I used a process pool here instead of something like asyncio workers because the decompressing of the gzipped data was rather CPU intensive, and I didn't want some workers being blocked on cpu-heavy, single threaded tasks. Below is the implementation of the function that actually fetches a single s3 bucket file, decompresses it in memory, and then inserts the data in bulk to Clickhouse
def download_raw_csv(s3_client, bucket: str, key: str) -> str:
buf = io.BytesIO()
s3_client.download_fileobj(bucket, key, buf)
buf.seek(0)
decompressed = gzip.GzipFile(fileobj=buf)
return decompressed.read().decode("utf-8")
def process_key(
aws_access_key_id: str,
aws_secret_access_key: str,
bucket_name: str,
bucket_key: str,
):
client = clickhouse_connect.get_client(
host=settings.clickhouse.host,
port=settings.clickhouse.port,
username=settings.clickhouse.username,
password=settings.clickhouse.password,
)
# Initialize a session using your credentials
session = boto3.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
# Create a client with your session and specify the endpoint
s3 = session.client(
"s3",
endpoint_url="https://files.polygon.io",
config=Config(signature_version="s3v4"),
)
ch_rows = []
print(f"Downloading {bucket_key}")
try:
raw_csv = download_raw_csv(s3, bucket_name, bucket_key)
reader = csv.DictReader(raw_csv.splitlines())
for row in reader:
ch_rows.append(transform(row))
except ClientError:
pass
if ch_rows:
print(f"Inserting data into clickhouse {bucket_key}")
client.insert(
database="polygon",
table="minute_aggs_v1",
column_names=[
"ts",
"symbol",
"open",
"close",
"high",
"low",
"volume",
"transactions",
],
data=ch_rows,
)
return len(ch_rows)
Boto3 / S3 File Fetcher
With this method, I found that I could run 8-10 processes very comfortably with 32 GB of ram on my homelab stack. YMMV, but that will download 15+ years of candle data overnight. With data for every ticker going back to 2010 and the table layout above, this is what the clickhouse table looked like after all the data was ingested. Some tables have been truncated to focus on this table specifically, so you may see others in the output if you're following along
SELECT
`table`,
formatReadableSize(size) AS size,
rows,
days,
formatReadableSize(avgDaySize) AS avgDaySize
FROM
(
SELECT
`table`,
sum(bytes) AS size,
sum(rows) AS rows,
min(min_date) AS min_date,
max(max_date) AS max_date,
max_date - min_date AS days,
size / (max_date - min_date) AS avgDaySize
FROM system.parts
WHERE active
GROUP BY `table`
ORDER BY size DESC
)
Query id: a0377286-ba92-4d98-817e-894f26ad405f
┌─table───────────────────┬─size───────┬───────rows─┬─days─┬─avgDaySize─┐
...
3. │ minute_aggs_v1 │ 74.35 GiB │ 3613938798 │ 0 │ inf YiB │
...
└─────────────────────────┴────────────┴────────────┴──────┴────────────┘
Clickhouse table size query for polygon minute agg data
Fantastic - now I had a cached dataset I could pull from quickly from my AI dataset and use for training, without relying on network connectivity to Polygon or latency to/from their datacenters.
The process for collecting data from Alpaca is largely the same, but with more traditional REST api collection method. This ended up being substantially slower with Alpaca's free-tier rate limiting, so one day I will likely collect up to 15 years of data to validate my polygon dataset, but for now a couple of months of data to validate recent polygon data was plenty for my needs.
Speaking of validation, once I had all of these rows, I immediately figured out something - I had no easy way of validating that my data collection method was correct, and manually looking through 3.6 billion rows is a daunting task. However, aggregating that data into graphs and displaying it visually was a great way to quickly determine whether I was on the right track. I had already setup Grafana on my kubernetes cluster at home, and by default it comes installed with a candlestick chart plugin that fits my use case exactly. Here is an example of the raw data, graphed using the plugin
Ok, so right away there is a problem with our data: it isn't normalized for stock splits. This is great from a "data purity" standpoint, as we can see exactly when a company stock split happened, but not great when trying to distill patterns out of mathematical numbers. The sudden split would likely confuse the AI since it would have no context around what actually happened. It would just see the stock price tank all of a sudden. To fix this, the data needs to normalized for stock splits.
Luckily, Polygon has a Stock Splits API that can be queried to get a list of all recorded stock splits for a ticker, along with the split ratio. Saving these to another table in our database and using them to calculate historical candles, adjusted for the split, should get good data to train on. Below is the python I used to calculate + save the normalized data
def _calculate_normalized_candles(
ticker: str,
start: str | None,
end: str | None,
):
click.echo("Connecting to clickhouse")
clickhouse_client = get_client(
host=settings.clickhouse.host,
port=settings.clickhouse.port,
username=settings.clickhouse.username,
password=settings.clickhouse.password,
send_receive_timeout=300,
settings={
"session_timeout": 120,
},
)
params = {"database": "polygon"}
where_clause = ["symbol = {symbol:String}"]
if start is not None:
params["start"] = dateutil.parser.parse(start)
where_clause.append("ts >= {start:DateTime}")
if end is not None:
params["end"] = dateutil.parser.parse(end)
where_clause.append("ts <= {end:DateTime}")
columns = ["ts", "symbol", "open", "close", "high", "low", "volume", "transactions"]
query = f"""
SELECT
{",".join(columns)}
FROM {{database:Identifier}}.{{table:Identifier}}
WHERE (
{" AND ".join(where_clause)}
)
ORDER BY ts DESC
"""
split_query = """
SELECT
execution_date,
split_from,
split_to
FROM {database:Identifier}.{table:Identifier}
WHERE symbol = {symbol:String}
ORDER BY execution_date DESC
"""
splits = list(
clickhouse_client.query(
split_query,
parameters={"table": "splits", "symbol": ticker, **params},
).named_results()
)
current_idx = 0
split_ratio = 1
current_split = splits[current_idx] if current_idx < len(splits) else None
with clickhouse_client.query_row_block_stream(
query,
parameters={"table": "minute_aggs_v1", "symbol": ticker, **params},
) as stream:
new_rows = []
for block in stream:
for row in block:
ts = row[0]
if current_split and ts.date() < current_split["execution_date"]:
split_ratio *= (
current_split["split_from"] / current_split["split_to"]
)
current_idx += 1
current_split = (
splits[current_idx] if current_idx < len(splits) else None
)
new_rows.append(
(
row[0],
row[1],
row[2] * split_ratio,
row[3] * split_ratio,
row[4] * split_ratio,
row[5] * split_ratio,
int(row[6] / split_ratio),
row[7],
)
)
if len(new_rows) > 50_000:
print(f"Inserting {len(new_rows)} {ticker} rows into clickhouse")
clickhouse_client.insert(
database="polygon",
table="minute_aggs_norm",
column_names=columns,
data=new_rows,
)
new_rows = []
if new_rows:
print(f"Inserting {len(new_rows)} {ticker} rows into clickhouse")
clickhouse_client.insert(
database="polygon",
table="minute_aggs_norm",
column_names=columns,
data=new_rows,
)
return ticker
Stupid and simple, but it works and spits out the data needed. After I created a table that looks identical to the raw candle data called minute_aggs_norm
and graphed that in grafana, the chart looks like the following. Eventually, I would like to move the raw data out of clickhouse and into a ceph bucket in a format that Clickhouse can read from natively using the S3 table engine, but for now this is ok and will allow training to start.
In the next blog post, I will go over the training dataset + dataloader, some deployment strategies on top of Kubernetes, and some initial results using only the candle data as inputs. Exciting stuff!