Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions 8-application-demos/6-kalshi-bet-predictor/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.DS_Store
.env
.venv/
.vscode/
23 changes: 23 additions & 0 deletions 8-application-demos/6-kalshi-bet-predictor/cerebrium.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[cerebrium.deployment]
name = "kalshi-bet-predictor"
python_version = "3.11"
docker_base_image_url = "debian:bookworm-slim"
disable_auth = true
include = ['./*', 'main.py', 'cerebrium.toml']
exclude = ['.*']

[cerebrium.dependencies.paths]
pip = "requirements.txt"

[cerebrium.hardware]
cpu = 4
memory = 16
compute = "CPU"

[cerebrium.scaling]
min_replicas = 0
max_replicas = 100
cooldown = 30
replica_concurrency = 1
scaling_metric = "concurrency_utilization"

127 changes: 127 additions & 0 deletions 8-application-demos/6-kalshi-bet-predictor/compare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import csv
import json
from typing import Dict, List, Tuple
import asyncio
import aiohttp

def load_markets(csv_path: str) -> List[Tuple[str, str]]:
markets = []
with open(csv_path, 'r') as f:
reader = csv.reader(f)
next(reader) # skip header
for row in reader:
if len(row) >= 2:
markets.append((row[0], row[1]))
return markets

async def get_market_data(session: aiohttp.ClientSession, kalshi_ticker: str,
polymarket_slug: str, endpoint_url: str) -> Dict:

payload = json.dumps({
'kalshi_ticker': kalshi_ticker,
'poly_slug': polymarket_slug
})

headers = {
'Authorization': '<YOUR AUTHORIZATION>',
'Content-Type': 'application/json'
}

try:
async with session.post(endpoint_url, headers=headers, data=payload) as response:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think axios might be much clearner than aiohttp. There a reason you used it?

response.raise_for_status()
data = await response.json()
print(data)
data = data['result']

kalshi_data = data['kalshi']
polymarket_data = data['polymarket']

return {
'kalshi_ticker': kalshi_ticker,
'polymarket_slug': polymarket_slug,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to name these as "true" "false" "percent" so users not what data type is returned

'kalshi_edge': kalshi_data['edge'],
'polymarket_edge': polymarket_data['edge'],
'kalshi_buy_yes': kalshi_data['buy_yes'],
'kalshi_buy_no': kalshi_data['buy_no'],
'polymarket_buy_yes': polymarket_data['buy_yes'],
'polymarket_buy_no': polymarket_data['buy_no'],
}
except Exception as e:
print(f"Error fetching data for {kalshi_ticker}/{polymarket_slug}: {e}")
return None

async def analyze_markets_async(csv_path: str, endpoint_url: str) -> List[Dict]:
markets = load_markets(csv_path)

print(f"Fetching data for {len(markets)} markets all at once...")

async with aiohttp.ClientSession() as session:
tasks = [get_market_data(session, kalshi_ticker, polymarket_slug, endpoint_url)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strange its kalshi_ticket and polymarket_slug. You can keep this on your cerebrium api but should have consistency here

for kalshi_ticker, polymarket_slug in markets]

results = await asyncio.gather(*tasks)

return [r for r in results if r is not None]

def compute_statistics(results: List[Dict]) -> None:
print("\n" + "="*80)
print("STATISTICS")
print("="*80)

if not results:
print("No results to analyze")
return

total_markets = len(results)

kalshi_edges = [r['kalshi_edge'] for r in results]
total_kalshi_edge = sum(kalshi_edges)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its tough to know if this is a count or money. Also is money in dollars or cents?


polymarket_edges = [r['polymarket_edge'] for r in results]
total_polymarket_edge = sum(polymarket_edges)

kalshi_better_count = sum(1 for r in results if r['kalshi_edge'] > r['polymarket_edge'])
polymarket_better_count = sum(1 for r in results if r['polymarket_edge'] > r['kalshi_edge'])
equal_count = total_markets - kalshi_better_count - polymarket_better_count

edge_differences = [abs(r['kalshi_edge'] - r['polymarket_edge']) for r in results]
avg_edge_difference = sum(edge_differences) / total_markets
max_edge_difference = max(edge_differences)

print(f"\nTotal markets analyzed: {total_markets}")
print("\n" + "-"*80)
print("COMPARISON")
print("-"*80)
print(f"Markets with greater Kalshi edge: {kalshi_better_count} ({kalshi_better_count/total_markets*100:.1f}%)")
print(f"Markets with greater Polymarket edge: {polymarket_better_count} ({polymarket_better_count/total_markets*100:.1f}%)")
print(f"Markets with equal edge: {equal_count} ({equal_count/total_markets*100:.1f}%)")
print(f"\nAverage edge difference: {avg_edge_difference:.4f}")
print(f"Max edge difference: {max_edge_difference:.4f}")

print("\n" + "="*80)
if total_kalshi_edge > total_polymarket_edge:
advantage = total_kalshi_edge - total_polymarket_edge
print(f"OVERALL: Kalshi has greater total edge (+{advantage:.4f})")
print(f"OVERALL: Kalshi has an average edge of (+{advantage/total_markets:.4f}) per market")
elif total_polymarket_edge > total_kalshi_edge:
advantage = total_polymarket_edge - total_kalshi_edge
print(f"OVERALL: Polymarket has greater total edge (+{advantage:.4f})")
print(f"OVERALL: Polymarket has an average edge of (+{advantage/total_markets:.4f}) per market")
else:
print(f"OVERALL: Both platforms have equal total edge")
print("="*80)

def main():
CSV_PATH = '<PATH TO YOUR .csv FILE>'
ENDPOINT_URL = '<YOUR HOSTED ENDPOINT>'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guessing this is the Cerebrium predict url?


print("Starting async market analysis...")
results = asyncio.run(analyze_markets_async(CSV_PATH, ENDPOINT_URL))

print(f"\nSuccessfully fetched {len(results)} markets")

compute_statistics(results)

if __name__ == "__main__":
main()
202 changes: 202 additions & 0 deletions 8-application-demos/6-kalshi-bet-predictor/find_equiv_markets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import csv
import os
import requests
import faiss
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Any

# --- Config ---
SIMILARITY_THRESHOLD = 0.70 # threshold for cosine simlarity
MAX_MARKET_LIMIT = 40000 # max number of active & open markets to gather
TOP_K = 5 # number of top Polymarket markets to check for each Kalshi market
KALSHI_API_URL = "https://api.elections.kalshi.com/trade-api/v2/markets"
POLYMARKET_API_URL = "https://clob.polymarket.com/markets"
OUTPUT_FILE = "markets.csv"

# ---------------------- API Fetch Functions ----------------------
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove


def get_kalshi_markets() -> List[Dict[str, Any]]:
print("Fetching Kalshi markets...")
markets_list = []
cursor = ""
try:
while True:
params = {'limit': 1000}
if cursor:
params['cursor'] = cursor

response = requests.get(KALSHI_API_URL, params=params)
response.raise_for_status()
data = response.json()

if 'markets' not in data:
print("Error: 'markets' key not in Kalshi response.")
break

for market in data['markets']:
if market['status'] == 'active' and market['market_type'] == 'binary':

markets_list.append({
'platform': 'Kalshi',
'title': market['title'],
'ticker': market['ticker'],
'url': f"https://kalshi.com/markets/{market['ticker']}",
'event_url': f"https://kalshi.com/markets/{market['event_ticker']}",
'close_date': market['close_time']
})

cursor = data['cursor']
print(f"Found {len(markets_list)} active and open markets")

if len(markets_list) > MAX_MARKET_LIMIT or not cursor:
break

print(f"Found {len(markets_list)} open binary markets on Kalshi.")
return markets_list

except requests.exceptions.RequestException as e:
print(f"Error fetching Kalshi markets: {e}")
return []

def get_kalshi_market(ticker):
title = requests.get(f"{KALSHI_API_URL}/{ticker}")
title = title.json()
return title['market']['title']

def get_polymarket_markets() -> List[Dict[str, Any]]:
print("Fetching Polymarket markets...")
markets_list = []
next_cursor = None

try:
while True:
params = {}
if next_cursor:
params['next_cursor'] = next_cursor

response = requests.get(POLYMARKET_API_URL, params=params)
response.raise_for_status()
data = response.json()

market_list_page = data['data']
if not market_list_page:
break

for market in market_list_page:
if market.get('active') and not market.get('closed'):
markets_list.append({
'platform': 'Polymarket',
'title': market.get('question'),
'id': market.get('condition_id'),
'url': f"https://polymarket.com/event/{market.get('market_slug')}",
'close_date': market.get('end_date_iso')
})

next_cursor = data.get('next_cursor')
print(f"Found {len(markets_list)} active and open markets")

if len(markets_list) > MAX_MARKET_LIMIT or not next_cursor or next_cursor == 'LTE=':
break

print(f"Found {len(markets_list)} open markets on Polymarket.")
return markets_list

except requests.exceptions.RequestException as e:
print(f"Error fetching Polymarket markets: {e}")
return []


# ---------------------- Matching ----------------------
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove


def find_similar_markets(kalshi_markets, polymarket_markets, threshold=0.9, top_k=TOP_K):
print("\nLoading NLP model...")
model = SentenceTransformer('all-MiniLM-L6-v2')

kalshi_titles = [m['title'] for m in kalshi_markets]
poly_titles = [m['title'] for m in polymarket_markets]

if not kalshi_titles or not poly_titles:
print("Not enough market data to compare.")
return []

print("Encoding titles into embeddings...")
kalshi_embeddings = model.encode(kalshi_titles, convert_to_numpy=True, normalize_embeddings=True)
poly_embeddings = model.encode(poly_titles, convert_to_numpy=True, normalize_embeddings=True)

print(f"Building vector index for {len(poly_embeddings)} Polymarket markets...")
dim = poly_embeddings.shape[1]
index = faiss.IndexFlatIP(dim) # Inner product for cosine similarity
index.add(poly_embeddings)

print(f"Querying top {top_k} nearest Polymarket markets for each Kalshi market...")
scores, indices = index.search(kalshi_embeddings, top_k)

potential_matches = []
for i, kalshi_market in enumerate(kalshi_markets):
for j in range(top_k):
score = float(scores[i][j])
if score >= threshold:
poly_market = polymarket_markets[indices[i][j]]
potential_matches.append({
'score': score,
'kalshi_market': kalshi_market,
'polymarket_market': poly_market
})
if i % 100 == 0:
print(f"Processed {i}/{len(kalshi_markets)} Kalshi markets...")


return potential_matches

def interactive_save(matches: List[Dict[str, Any]]):
print("\n--- Review Mode ---")
print("Press 'y' to save a match, anything else to skip.\n")

file_exists = os.path.exists(OUTPUT_FILE)
with open(OUTPUT_FILE, "a", newline='', encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
if not file_exists:
writer.writerow(["kalshi_ticker", "polymarket_slug"])

for i, match in enumerate(matches):
kalshi_ticker = match['kalshi_market']['ticker']
poly_slug = match['polymarket_market']['url'].split("event/")[1]
kalshi_title = get_kalshi_market(kalshi_ticker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to use this function? Isn't the title already returned i the the matches

poly_title = match['polymarket_market']['title']
score = match['score']

print(f"\nMatch #{i+1} (Score: {score:.4f})")
print(f"[KALSHI] {kalshi_title}")
print(f"[POLYMARKET] {poly_title}")
print(f" > Kalshi URL: {match['kalshi_market']['url']}")
print(f" > Polymarket URL:{match['polymarket_market']['url']}")

choice = input("Save this match? (y/n): ").strip().lower()
if choice == 'y':
writer.writerow([kalshi_ticker, poly_slug])
print("Saved.")
else:
print("Skipped.")

print(f"\nDone. Saved matches to '{OUTPUT_FILE}'.")

def main():
kalshi_markets = get_kalshi_markets()
polymarket_markets = get_polymarket_markets()

if not kalshi_markets or not polymarket_markets:
print("\nCould not fetch markets from one or both platforms. Exiting.")
return

matches = find_similar_markets(kalshi_markets, polymarket_markets, SIMILARITY_THRESHOLD)
print(f"\n--- Found {len(matches)} Potential Matches ---")

if not matches:
print("No strong matches found.")
return

matches.sort(key=lambda x: x['score'], reverse=True)
interactive_save(matches)

if __name__ == "__main__":
main()
Loading