Skip to content

Commit

Permalink
added data visualizer for chart api
Browse files Browse the repository at this point in the history
  • Loading branch information
MIKEINTOSHSYSTEMS committed Dec 11, 2024
1 parent 8039ae4 commit 37ebf49
Show file tree
Hide file tree
Showing 93 changed files with 17,279 additions and 2,298 deletions.
222 changes: 126 additions & 96 deletions ai/mod/hispmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,118 +41,110 @@ def load_chat_history():
groq_chat = ChatGroq(groq_api_key=os.getenv("GROQ_API_KEY"), model_name=LLM_MOD, temperature=0.2)
conversation = ConversationChain(llm=groq_chat, memory=memory)

# Function to fetch JSON data from the provided API URL
def fetch_json_data():
url = "https://hispmd.merqconsultancy.org/app/api/v1.php?apikey=$2y$10$vdzwxfOVSYoy9xMYgISLjuNfKYxGCa4RpQRTTm5kU3qcMDxupd72C&table=hispmd_indicators_data&action=list"
response = requests.get(url)
# Function to fetch JSON data from the API with dynamic filtering
def fetch_json_data(query_params):
base_url = "https://hispmd.merqconsultancy.org/app/api/v1.php?apikey=$2y$10$vdzwxfOVSYoy9xMYgISLjuNfKYxGCa4RpQRTTm5kU3qcMDxupd72C&table=hispmd_indicators_data&action=list"
query_string = "&q=" + query_params
response = requests.get(base_url + query_string)
if response.status_code == 200:
return response.json()["data"]
else:
return None

# Get all indicator names for fuzzy matching
def get_indicator_names(json_data):
return [entry["Indicator Name"] for entry in json_data]

# Fuzzy match function to find the best match or suggest alternatives
def fuzzy_search(query, json_data):
indicator_names = get_indicator_names(json_data)
match_result = process.extractOne(query, indicator_names, scorer=fuzz.token_sort_ratio)

if match_result and match_result[1] > 70: # match_result[1] is the score
return match_result[0] # return the best match
else:
# Suggest up to 3 closest matches
suggestions = process.extract(query, indicator_names, scorer=fuzz.token_sort_ratio, limit=3)
return suggestions

# Function to filter and summarize data based on user input
def filter_data(json_data, query):
filtered_data = []
indicator_name = fuzzy_search(query, json_data)

if indicator_name:
filtered_data = [
entry for entry in json_data
if isinstance(entry.get("Indicator Name"), str) and indicator_name.lower() in entry["Indicator Name"].lower()
]
# Function to parse user query and build dynamic filters for API search
def parse_query(user_input):
filters = []
if "indicator" in user_input.lower():
filters.append(f"Indicator Name~equals~{user_input.split('indicator')[-1].strip()}")
if "year" in user_input.lower():
year = [word for word in user_input.split() if word.isdigit()]
if year:
filters.append(f"Year~equals~{year[0]}")
if "scope" in user_input.lower():
scope = [word for word in user_input.split() if word.lower() in ["national", "regional", "global"]]
if scope:
filters.append(f"Scope~equals~{scope[0].capitalize()}")
if "region" in user_input.lower():
region = [word for word in user_input.split() if word.lower() not in ["year", "scope", "indicator"]]
if region:
filters.append(f"Region~equals~{' '.join(region)}")

# Combine filters into one query string
query = "(" + ")(".join(filters) + ")"
return query

# Function to perform fuzzy matching for an indicator name in the dataset
def fuzzy_search(query, data):
# Extract indicator names from the data
indicator_names = [entry["Indicator Name"] for entry in data]

return filtered_data

# Function to create a natural language summary
def summarize_data(filtered_data):
if not filtered_data:
return "Sorry, no matching data was found."

# Generate summary
summary = ""
indicator_name = filtered_data[0].get("Indicator Name", "Unknown Indicator")
summary += f"The indicator '{indicator_name}' has the following data:\n\n"

for entry in filtered_data:
year = entry.get("Year", "Unknown Year")
value = entry.get("Value", "Unknown Value")
region = entry.get("Region", "Unknown Region")
data_source = entry.get("Data Source", "Unknown Data Source")
summary += f"- In {year}, the value was {value} in {region} (Source: {data_source}).\n"

return summary

# Function to compare values across different regions or years
def compare_values(filtered_data, compare_by="Region"):
comparison = {}

for entry in filtered_data:
key = entry.get(compare_by, "Unknown")
value = float(entry.get("Value", 0))
if key in comparison:
comparison[key] += value
else:
comparison[key] = value

# Return max and min comparisons
if comparison:
max_key = max(comparison, key=comparison.get)
min_key = min(comparison, key=comparison.get)
return f"The highest value for '{compare_by}' is in {max_key} with {comparison[max_key]} and the lowest is in {min_key} with {comparison[min_key]}."
else:
return "Sorry, not enough data to compare."
# Perform fuzzy matching using the 'process' function
matches = process.extract(query, indicator_names, scorer=fuzz.ratio, limit=5)

# Return the matched indicator names
return matches if matches else None

# Function to calculate aggregates (mean, sum, min, max) for the filtered data
def calculate_aggregates(data):
values = [float(entry["Value"]) for entry in data if entry["Value"]]
if values:
return {
"mean": sum(values) / len(values),
"sum": sum(values),
"min": min(values),
"max": max(values)
}
return None

# Function to handle user message and response
def send_message():
formatted_input = st.session_state.user_input.replace("\n", "\n\n")
response = None

# Parse user input to build dynamic query
query_params = parse_query(formatted_input)

# Fetch the JSON data
json_data = fetch_json_data()
json_data = fetch_json_data(query_params)

if json_data:
user_input_lower = formatted_input.lower()

if "how many regions" in user_input_lower:
unique_regions = set(entry["Region"] for entry in json_data if entry["Region"])
response = f"There are {len(unique_regions)} regions."

elif "compare" in user_input_lower:
filtered_data = filter_data(json_data, user_input_lower)
response = compare_values(filtered_data)

elif "show me" in user_input_lower or "give me" in user_input_lower:
filtered_data = filter_data(json_data, user_input_lower)
if filtered_data:
df = pd.DataFrame(filtered_data)
df = df[['Year', 'Value', 'Data Source', 'Assessment', 'Region']]
st.table(df)
response = summarize_data(filtered_data)
else:
response = "Sorry, I couldn't find any data matching your query."

# Fuzzy search for indicator
matched_indicator = fuzzy_search(formatted_input, json_data)

if matched_indicator:
if isinstance(matched_indicator, list): # If suggestions are provided
response = "I found these possible matches for your query:\n"
for idx, suggestion in enumerate(matched_indicator):
# Make each suggestion clickable as a button with a unique key based on index
button_label = suggestion[0]
button_key = f"button_{idx}_{button_label}" # Unique key for each button
if st.button(button_label, key=button_key): # When clicked, run the query for that indicator
# Filter the data for the selected indicator
selected_indicator_name = button_label
filtered_data = [
entry for entry in json_data
if entry["Indicator Name"].lower() == selected_indicator_name.lower()
]
# Recalculate aggregates or trends based on the query type
response = process_selected_indicator(filtered_data, user_input_lower, selected_indicator_name)
break
else: # Exact match found
matched_indicator_name = matched_indicator[0][0]
filtered_data = [
entry for entry in json_data
if entry["Indicator Name"].lower() == matched_indicator_name.lower()
]
response = f"I found the following data for '{matched_indicator_name}':\n"
for entry in filtered_data:
response += (
f"- Year: {entry['Year']}, Region: {entry['Region'] or 'unspecified'}, "
f"Value: {entry['Value']}, Data Source: {entry['Data Source']}, "
f"Scope: {entry['Scope']}, Assessment: {entry['Assessment']}\n"
)
else:
filtered_data = filter_data(json_data, user_input_lower)
if filtered_data:
response = summarize_data(filtered_data)
else:
response = "Sorry, I couldn't recognize the indicator you mentioned."
response = "Sorry, I couldn't recognize the indicator you're asking about. Could you provide more details?"
else:
response = "Unable to fetch data from the server."

Expand All @@ -168,6 +160,42 @@ def send_message():
session.add(new_entry)
session.commit()

st.write(response)

# Function to process selected indicator after button click
def process_selected_indicator(filtered_data, user_input_lower, selected_indicator_name):
if "highest value" in user_input_lower:
max_value_entry = max(filtered_data, key=lambda x: float(x["Value"]) if x["Value"] else float('-inf'))
return (
f"The region with the highest value for '{selected_indicator_name}' is "
f"{max_value_entry['Region'] or 'unspecified'} with a value of {max_value_entry['Value']} "
f"in the year {max_value_entry['Year']}."
)
elif "lowest value" in user_input_lower:
min_value_entry = min(filtered_data, key=lambda x: float(x["Value"]) if x["Value"] else float('inf'))
return (
f"The region with the lowest value for '{selected_indicator_name}' is "
f"{min_value_entry['Region'] or 'unspecified'} with a value of {min_value_entry['Value']} "
f"in the year {min_value_entry['Year']}."
)
elif "average value" in user_input_lower or "mean value" in user_input_lower:
aggregates = calculate_aggregates(filtered_data)
if aggregates:
return (
f"The average value for '{selected_indicator_name}' across all regions and years is "
f"{aggregates['mean']:.2f}. Sum: {aggregates['sum']:.2f}, Min: {aggregates['min']}, Max: {aggregates['max']}."
)
else:
return "No valid values found for averaging."
elif "trend" in user_input_lower:
trend_data = sorted(filtered_data, key=lambda x: x["Year"])
return (
f"Here is the trend for '{selected_indicator_name}' over the years:\n"
+ "\n".join([f"Year: {entry['Year']}, Value: {entry['Value']}" for entry in trend_data])
)
else:
return f"Here is the data for '{selected_indicator_name}':\n" + "\n".join([str(entry) for entry in filtered_data])

# UI Components
with st.expander("Chat", expanded=True):
user_input = st.text_area("Ask a question:", height=200, value=st.session_state.user_input if 'user_input' in st.session_state else "", key="user_input")
Expand All @@ -176,6 +204,8 @@ def send_message():
# Display chat history
chat_history_container = st.container()
chat_history_container.write("Chat History:")

# Iterate through the chat history in reverse order
for message in reversed(st.session_state.chat_history):
if isinstance(message, dict) and "human" in message and "AI" in message:
chat_history_container.markdown(f"**{message['human']}**\n\n_{message['AI']}_") # Ensure newline handling
st.write(f"You: {message['human']}")
st.write(f"Bot: {message['AI']}")
Loading

0 comments on commit 37ebf49

Please sign in to comment.