Skip to content

Commit

Permalink
Attributes updates and oee (#389)
Browse files Browse the repository at this point in the history
* Enable attributes updates and gives oee object example

- Attribute updates are now done with Windowing and Upsert channels, enabled by Flink 1.16
- Updates have typically the challenge that retracted changes create unneccessary noise on Kafka.
  This is solved by upsert-to-upsert updates.
- OEE objects allow to request online OEE calculation and are an example for combining time-series data with
  structured data and data views.

* Fix flink not waiting for minio when external S3 is used
  • Loading branch information
wagmarcel authored May 19, 2023
1 parent 672762d commit b8672be
Show file tree
Hide file tree
Showing 48 changed files with 4,431 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ data:
taskmanager.memory.process.size: 1728m
parallelism.default: {{ .Values.flink.defaultParalellism }}
s3.endpoint: {{ printf "%s://%s" .Values.s3.protocol .Values.s3.endpoint }}
{{- if .Values.minio.enabled }}
s3.path.style.access: true
{{- end }}
s3.access-key: {{ .Values.s3.userAccessKey }}
{{ if $secret }}
s3.secret-key: {{ $secret.data.CONSOLE_SECRET_KEY | b64dec }}
Expand Down
2 changes: 1 addition & 1 deletion helm/charts/scorpio/templates/scorpio-aaio.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ spec:
spec:
containers:
- name: {{ .Values.aaio.name }}
image: {{ .Values.mainRepo }}/{{ .Values.aaio.name }}:{{ .Values.aaio.image.tag }}
image: {{ .Values.mainRepo }}/{{ .Values.aaio.name }}:{{ .Values.mainVersion }}
imagePullPolicy: {{ .Values.aaio.image.pullPolicy }}
env:
- name: CLIENT_ID
Expand Down
2 changes: 1 addition & 1 deletion helm/charts/sql-core/templates/core-kafka-topics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ spec:
partitions: 1
replicas: 1
config:
retention.ms: '{{.Values.kafkaBridge.debezium.entityTopicRetention}}'
retention.ms: '{{.Values.kafkaBridge.alerta.bulkTopicRetention}}'
topicName: {{.Values.kafkaBridge.debezium.attributesTopic}}_insert
24 changes: 17 additions & 7 deletions helm/charts/sql-core/templates/core-statementsets.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
# yamllint disable rule:line-length
apiVersion: industry-fusion.com/v1alpha1
apiVersion: industry-fusion.com/v1alpha2
kind: BeamSqlStatementSet
metadata:
name: core-services
Expand Down Expand Up @@ -70,15 +70,25 @@ spec:
GROUP BY window_start, window_end, entityId, name
) GROUP BY window_start, window_end, id;
- |-
/* copy changes from attributes-inserts back to attributes */
insert into attributes
SELECT id, last_value(entityId), last_value(name), last_value(nodeType), last_value(valueType), index, last_value(`type`), last_value(`https://uri.etsi.org/ngsi-ld/hasValue`), last_value(`https://uri.etsi.org/ngsi-ld/hasObject`), window_start
FROM TABLE(
TUMBLE(TABLE attributes_insert_filter, DESCRIPTOR(`ts`), INTERVAL {{.Values.flink.attributeInsertWindow|squote}} SECOND))
GROUP BY window_start, window_end, id, index;
/* forward only non-retracted attribute changes
This is part of the core service */
insert into attributes_writeback
select id, entityId, name, nodeType, valueType, `index`, `type`, `value`, `object` from
(select id, last_value(entityId) as entityId, last_value(name) as name, last_value(nodeType) as nodeType, last_value(valueType) as valueType , `index`, last_value(`type`) as `type`,
last_value(`https://uri.etsi.org/ngsi-ld/hasValue`) as `value`,
last_value(`https://uri.etsi.org/ngsi-ld/hasObject`) as `object`,
TUMBLE_START(ts, INTERVAL '0.001' SECOND),
TUMBLE_END(ts, INTERVAL '0.001' SECOND)
from attributes_insert
group by id, `index`, TUMBLE(ts, INTERVAL '0.001' SECOND));
tables:
- alerts
- alerts-filter
- attributes
- attributes-writeback
- attributes-insert
- attributes-insert-filter
- ngsild-updates
views:
- attributes-insert-view
57 changes: 54 additions & 3 deletions helm/charts/sql-core/templates/core-tables.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,47 @@ spec:
- 'type': STRING
- 'https://uri.etsi.org/ngsi-ld/hasValue': STRING
- 'https://uri.etsi.org/ngsi-ld/hasObject': STRING
- 'ts': TIMESTAMP(3) METADATA FROM 'timestamp'
- 'ts': TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL
- 'watermark': FOR ts AS ts
kafka:
topic: {{.Values.kafkaBridge.debezium.attributesTopic}}
properties:
bootstrap.servers: {{.Values.kafka.bootstrapServer}}
scan.startup.mode: earliest-offset
scan.startup.mode: latest-offset
value:
format: json
json.fail-on-missing-field: false
json.ignore-parse-errors: true
---
apiVersion: industry-fusion.com/v1alpha2
kind: BeamSqlTable
metadata:
name: attributes-writeback
spec:
name: attributes_writeback
connector: upsert-kafka
fields:
- id: STRING
- entityId: STRING
- name: STRING
- nodeType: STRING
- valueType: STRING
- index: INTEGER
- type: STRING
- https://uri.etsi.org/ngsi-ld/hasValue: STRING
- https://uri.etsi.org/ngsi-ld/hasObject: STRING
kafka:
topic: iff.ngsild.attributes
properties:
bootstrap.servers: my-cluster-kafka-bootstrap:9092
key.format: json
value:
format: json
json.fail-on-missing-field: false
json.ignore-parse-errors: true
primaryKey:
- id
- index
---
apiVersion: industry-fusion.com/v1alpha1
kind: BeamSqlView
Expand All @@ -140,7 +170,8 @@ spec:
sqlstatement: |
SELECT id, entityId, name, nodeType, valueType, index, `type`,
`https://uri.etsi.org/ngsi-ld/hasValue`,
`https://uri.etsi.org/ngsi-ld/hasObject`
`https://uri.etsi.org/ngsi-ld/hasObject`,
`ts`
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY `id`
Expand All @@ -165,6 +196,8 @@ spec:
- type: STRING
- https://uri.etsi.org/ngsi-ld/hasValue: STRING
- https://uri.etsi.org/ngsi-ld/hasObject: STRING
- ts: TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL
- watermark: FOR `ts` AS `ts`
kafka:
topic: iff.ngsild.attributes_insert
properties:
Expand Down Expand Up @@ -206,3 +239,21 @@ spec:
format: json
json.fail-on-missing-field: false
json.ignore-parse-errors: true
---
apiVersion: industry-fusion.com/v1alpha1
kind: BeamSqlView
metadata:
name: attributes-insert-view
spec:
name: attributes_insert_view
sqlstatement: |
SELECT id, entityId, name, nodeType, valueType, index, `type`,
`https://uri.etsi.org/ngsi-ld/hasValue`,
`https://uri.etsi.org/ngsi-ld/hasObject`,
`ts`
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY `id`
ORDER BY ts DESC) AS rownum
FROM `attributes_insert_filter`)
WHERE rownum = 1 and entityId is NOT NULL;
7 changes: 4 additions & 3 deletions helm/values.yaml.gotmpl
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ flink:
defaultParalellism: 1
alertWindow: '0.001'
ngsildUpdateWindow: '0.001'
attributeInsertWindow: '0.1'
attributeInsertWindow: '0.001'

minio:
enabled: {{ .StateValues.minio.enabled }}
Expand All @@ -88,11 +88,12 @@ minio:
adminAccessKey: console
mcImage: minio/mc:RELEASE.2023-02-28T00-12-59Z


s3:
{{- $s3 := dict "" }}
{{- $protocol := "http" }}
{{- if (hasKey .StateValues "s3") }}{{- $s3 = .StateValues.s3 }}{{- end }}
protocol: {{- if (hasKey $s3 "protocol") }} {{ quote $s3.protocol }} {{- else }} http {{- end }}
protocol: {{ if (hasKey $s3 "protocol") }}{{ quote $s3.protocol }}{{- else }} http {{- end }}
endpoint: {{ if (hasKey $s3 "endpoint") }}{{ quote $s3.endpoint }}{{- else }} minio.{{ .StateValues.namespace}}.svc.cluster.local {{- end }}
userAccessKey: {{ if (hasKey $s3 "userAccessKey") }}{{ quote $s3.userAccessKey }}{{- else }} minio {{- end }}
userSecretKey: {{ if (hasKey $s3 "userSecretKey") }}{{ quote $s3.userSecretKey }}{{- else }} "{{ .StateValues.minioUserSecretKey }}" {{- end }}
Expand Down Expand Up @@ -234,7 +235,7 @@ velero:
accessMode: ReadWrite
config:
region: de
s3Url: {{- if (hasKey $s3 "protocol") }} {{ quote $s3.protocol }} {{- else }} http{{- end }}://{{ if (hasKey $s3 "endpoint") }}{{ quote $s3.endpoint }}{{- else }}minio.{{ .StateValues.namespace }}.svc.cluster.local{{- end }}
s3Url: {{ if (hasKey $s3 "protocol") }} {{ $s3.protocol }} {{- else }} http{{- end }}://{{ if (hasKey $s3 "endpoint") }}{{ quote $s3.endpoint }}{{- else }}minio.{{ .StateValues.namespace }}.svc.cluster.local{{- end }}
s3ForcePathStyle: {{- if .StateValues.minio.enabled }} true {{- else }} false {{- end }}
defaultVolumesToRestic: true
credentials:
Expand Down
24 changes: 18 additions & 6 deletions semantic-model/kms/knowledge.ttl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX iff: <https://industry-fusion.com/types/v0.9/>
PREFIX oee: <https://industry-fusion.com/oee/v0.9/>
PREFIX owl: <http://www.w3.org/2002/07/owl#>

### Test classes
Expand All @@ -26,10 +27,12 @@ iff:workpiece rdfs:subClassOf iff:entity ;
a iff:class .
iff:filterCartridge rdfs:subClassOf iff:entity ;
a iff:class .
iff:operationSchedule rdfs:subClassOf iff:linkedEntity ;
a iff:class .
iff:maintenanceInterval rdfs:subClassOf iff:linkedEntity ;
a iff:class .


iff:scheduleEntity a iff:class .
iff:oeeTemplate rdfs:subClassOf iff:scheduleEntity .

### Machine states
iff:machineState a rdfs:class .
iff:state_OFF a iff:machineState .
iff:state_OFF iff:stateValidFor iff:filter, iff:cutter .
Expand All @@ -43,12 +46,21 @@ iff:state_CLEARING a iff:machineState .
iff:state_CLEARING iff:stateValidFor iff:cutter .
iff:state_CLEANING a iff:machineState .
iff:state_CLEANING iff:stateValidFor iff:filter .
iff:state_ERROR a iff:machineState .
iff:state_ERROR iff:stateValidFor iff:filter, iff:cutter .

### OEE definitions: What state defines availability
iff:state_PROCESSING oee:availability "1" .
iff:state_PREPARING oee:availability "0" .

# labels for states
iff:state_ON rdfs:label "ON" .
iff:state_OFF rdfs:label "OFF" .
iff:state_PREPARING rdfs:label "PREPARING" .
iff:state_PROCESSING rdfs:label "PROCESSING" .
iff:state_CLEARING rdfs:label "ON" .
iff:state_ERROR rdfs:label "ERROR" .

iff:WC0 a iff:WC .
iff:WC1 a iff:WC .
iff:WC2 a iff:WC .
Expand All @@ -60,8 +72,8 @@ iff:WC1 iff:higherHazardLevel iff:WC0 .
#iff:WC1 a rdf:Bag .
iff:WC1 iff:containsMaterialNumber [
a rdf:Bag ;
rdf:_1 "1.4301" ;
rdf:_2 "1.4302" ;
rdf:_1 "1.4301" ;
rdf:_2 "1.4302" ;
rdf:_3 "1.4303" ;
rdf:_4 "1.4304" ;
rdf:_5 "1.4305" ;
Expand Down
Loading

0 comments on commit b8672be

Please sign in to comment.