-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathdata-ingestion.camel.yaml
77 lines (77 loc) · 2.49 KB
/
data-ingestion.camel.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
- route:
id: data-ingestion
description: Data ingestion
from:
id: from-2954
description: Consume transactions
uri: kafka:transactions
parameters:
brokers: kafka.rm066rh-dev.svc.cluster.local:9092
clientId: data-ingestion-client
saslJaasConfig: org.apache.kafka.common.security.scram.ScramLoginModule required
username="user1"
password="{{secret:kafka-user-passwords/client-passwords}}";
saslMechanism: SCRAM-SHA-256
securityProtocol: SASL_PLAINTEXT
steps:
- unmarshal:
id: unmarshal-1178
description: Parse CSV format
csv:
allowMissingColumnNames: true
trim: true
- split:
id: split-6116
description: Split CSV records
steps: []
expression:
simple:
expression: ${body}
- to:
id: to-2761
description: Send the record to storage
uri: direct:store-record-endpoint
parameters: {}
- route:
id: record-storage
description: Record storage into DB
from:
id: from-3238
uri: direct:store-record-endpoint
parameters: {}
steps:
- setHeaders:
id: setHeaders-3758
headers:
- expression:
simple:
expression: ${body[0]}
resultType: java.lang.String
name: timestamp
- expression:
simple:
expression: ${body[1]}
resultType: java.lang.String
name: account
- expression:
simple:
expression: ${body[2]}
resultType: java.lang.Float
name: amount
- to:
id: to-1127
uri: sql:INSERT INTO transactions (timestamp, account, amount) VALUES
(:#${headers.timestamp}, :#${headers.account},
:#${headers.amount});
parameters:
dataSource: "#PostgresqlDataSource"
disabled: false
- beans:
- name: PostgresqlDataSource
properties:
databaseName: "{{secret:postgres/database-name}}"
password: "{{secret:postgres/database-password}}"
portNumber: "5432"
serverName: postgres.rm066rh-dev.svc.cluster.local
user: "{{secret:postgres/database-user}}"
type: org.postgresql.ds.PGSimpleDataSource