-
Notifications
You must be signed in to change notification settings - Fork 0
/
predict_air_quality.py
executable file
·84 lines (64 loc) · 2.72 KB
/
predict_air_quality.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import boto3
import json
import datetime
def lambda_handler(event, context):
# TODO implement
#!/usr/bin/env python
sgm_client = boto3.client('sagemaker-runtime', region_name ='us-east-1')
ddb_client = boto3.client('dynamodb', region_name ='us-east-1')
response_ddb = ddb_client.scan(
TableName="SIPAKUSensorData",
AttributesToGet=[
'Tanggal', 'Value'
]
)
data = response_ddb['Items']
latest_data = dict()
for x in data:
if x['Tanggal']['S'] not in latest_data:
latest_data[x['Tanggal']['S']] = list()
# TODO: write code...\
latest_data[x['Tanggal']['S']].append(float(x['Value']['N']))
new_latest_data = list()
for idx, value in latest_data.items():
new_latest_data.append({'M': {'Tanggal': {'S' : idx}, 'Average': {'N' : str(sum(latest_data[idx]) / len(latest_data[idx]))}}})
new_latest_data.sort(key=lambda x: datetime.datetime.strptime(x['M']['Tanggal']['S'], '%Y-%m-%d'))
new_latest_data = new_latest_data[len(new_latest_data) - 6:]
next_7_days = get_next_7_days(sgm_client, new_latest_data)
now = datetime.datetime.now().strftime('%Y-%m-%d')
start_from = datetime.datetime.strptime(new_latest_data[len(new_latest_data)-1]['M']['Tanggal']['S'], '%Y-%m-%d')
result = ddb_client.put_item(Item={
"PredID": {'S': "1"},
"DateCreated": {'S': now},
"DateStartPredict": {'S': start_from.strftime('%Y-%m-%d')},
"PredictData": {'L': next_7_days}
}, TableName='SIPAKUPredictionResult')
return {
'statusCode': 200,
'body': json.dumps(result)
}
def predict_next_days(sgm_client, last_7_days):
input_data = ""
for i in range(len(last_7_days)):
input_data += str(last_7_days[i]['M']['Average']['N'])
if i < len(last_7_days) - 1:
input_data += ","
response = sgm_client.invoke_endpoint(
EndpointName='ServiceEndpoint',
Body=input_data,
ContentType='text/csv'
)
result = json.loads(response['Body'].read().decode())
last_7_plus_pred = list(last_7_days)
next_date = datetime.datetime.strptime(last_7_days[len(last_7_days) - 1]['M']['Tanggal']['S'], '%Y-%m-%d')
next_date += datetime.timedelta(days=1)
last_7_plus_pred.append({'M': {
"Tanggal": {'S': next_date.strftime('%Y-%m-%d')},
"Average": {'N': str(result['predictions'][0]['score'])}
}})
return last_7_plus_pred[1:]
def get_next_7_days(sgm_client, last_7_days_data):
result = last_7_days_data
for i in range(6):
result = predict_next_days(sgm_client, result)
return result