-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsentiment.py
73 lines (57 loc) · 2.1 KB
/
sentiment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import httpx
from prefect import flow, task
from datetime import timedelta
from prefect.tasks import task_input_hash
from prefect.artifacts import create_markdown_artifact
# Get news data
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=0.1))
def get_news():
print("news")
markdown_content = f""" # News """
create_markdown_artifact(
key="sentiment-report",
markdown=markdown_content,
description="New Summary",
)
return "news"
# Compute sentiment and store scores
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=0.3))
def calculate_sentiment_scores(news):
print("sentiment_scores")
markdown_content = f""" # Sentiment Scores """
create_markdown_artifact(
key="sentiment-report",
markdown=markdown_content,
description="Sentiment Scores",
)
return "sentiment_scores"
# Aggregate sentiment scores
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=0.2))
def aggregate_sentiment(sentiment_scores):
print("aggregate sentiment")
markdown_content = f""" # Aggregated Sentiment """
create_markdown_artifact(
key="sentiment-report",
markdown=markdown_content,
description="Aggregated Sentiment",
)
return "aggregate sentiment"
# Send sentiment and trade signal
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=0.4))
def send_recommendation(aggregated_sentiment):
print("send recommendation")
markdown_content = f""" # Recommendation """
create_markdown_artifact(
key="sentiment-report",
markdown=markdown_content,
description="Recommendation",
)
return "send recommendation"
# Put all the tasks together into a flow
@flow(log_prints=True)
def sentiment_flow():
news = get_news()
sentiment_scores = calculate_sentiment_scores(news=news)
aggregated_sentiment = aggregate_sentiment(sentiment_scores=sentiment_scores)
sent_sentiment = send_recommendation(aggregated_sentiment=aggregated_sentiment)
print(news, sentiment_scores, aggregated_sentiment, sent_sentiment)