-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoci_read_streaming.py
40 lines (30 loc) · 1.08 KB
/
oci_read_streaming.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import os, oci
from oci.config import validate_config
from dotenv import load_dotenv
from base64 import b64decode, b64encode
load_dotenv()
instance_dict = {}
config = { "user": os.environ.get('authUserID'),
"key_file": os.environ.get('ociKeyPath'),
"fingerprint": os.environ.get('keyFingerPrint'),
"tenancy": os.environ.get('tenancyID'),
"region": "us-ashburn-1"}
streamingID = os.environ.get('streamingID')
endpoint = "https://cell-1.streaming.us-ashburn-1.oci.oraclecloud.com"
validate_config(config)
streaming = oci.streaming.StreamClient(config, endpoint)
def getMessages(streamingID):
cursor_detail = getCursor()
cursor = streaming.create_cursor(streamingID, cursor_detail)
r = streaming.get_messages(streamingID, cursor.data.value)
messages = []
if len(r.data):
for message in r.data:
messages.append(b64decode(message.value).decode('utf-8'))
return messages
def getCursor():
cursor = oci.streaming.models.CreateCursorDetails()
cursor.partition = "0"
cursor.type = "TRIM_HORIZON"
return cursor
print(getMessages(streamingID))