Skip to content

Commit

Permalink
Update loaded relations within subscriptions (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
bastianjoel authored Jan 22, 2025
1 parent 3f923f9 commit 240d3e5
Show file tree
Hide file tree
Showing 54 changed files with 3,047 additions and 203 deletions.
13 changes: 9 additions & 4 deletions cmd/modelgen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func parse(r io.Reader) ([]collection, error) {

relCollection := strings.Split(modelField.To, "/")[0]
relCollectionName := goName(relCollection)
relations = append(relations, relation{
rel := relation{
GoName: goName(collectionName),
PropName: propName,
PropNameLc: string(propNameLc),
Expand All @@ -127,7 +127,9 @@ func parse(r io.Reader) ([]collection, error) {
IdFieldGo: goName(fieldName),
Required: modelField.Required,
List: modelField.Type == "relation-list",
})
}
f.Relation = &rel
relations = append(relations, rel)
}

if modelField.Type == "generic-relation" {
Expand All @@ -140,7 +142,7 @@ func parse(r io.Reader) ([]collection, error) {
toCollections[c.Collection] = goName(c.Collection)
}

relations = append(relations, relation{
rel := relation{
GoName: goName(collectionName),
PropName: propName,
PropNameLc: string(propNameLc),
Expand All @@ -152,7 +154,9 @@ func parse(r io.Reader) ([]collection, error) {
List: false,
Generic: true,
ToCollections: toCollections,
})
}
f.Relation = &rel
relations = append(relations, rel)
}

fields = append(fields, f)
Expand Down Expand Up @@ -194,6 +198,7 @@ type field struct {
FieldName string
Required bool
SingleRelation bool
Relation *relation
}

type relation struct {
Expand Down
24 changes: 23 additions & 1 deletion cmd/modelgen/model_struct.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package models
import (
"encoding/json"
"fmt"
"slices"
"strconv"
"strings"

Expand All @@ -28,6 +29,19 @@ func (m *{{.GoName}}) CollectionName() string {
}

{{end}}
func (m *{{.GoName}}) GetRelated(field string, id int) *RelatedModelsAccessor { {{if .HasRelations}}
switch field { {{range .Relations}}
case "{{.IdField}}":{{if .List}}
for _, r := range m.{{.PropNameLc}} {
if r.ID == id {
return r.GetRelatedModelsAccessor()
}
}{{else}}
return m.{{.PropNameLc}}.GetRelatedModelsAccessor(){{end}}{{end}}
}
{{end}}
return nil}

func (m *{{.GoName}}) SetRelated(field string, content interface{}) { {{if .HasRelations}}
if content != nil {
switch field { {{range .Relations}}
Expand Down Expand Up @@ -126,7 +140,13 @@ func (m *{{.GoName}}) Update(data map[string]string) error { {{range .Fields}}
if err != nil {
return err
}
}
{{if .Relation}}{{if .Relation.List}}
if _, ok := m.loadedRelations["{{ .Relation.IdField }}"]; ok {
m.{{ .Relation.PropNameLc }} = slices.DeleteFunc(m.{{ .Relation.PropNameLc }}, func (r *{{ .Relation.CollectionName }}) bool {
return !slices.Contains(m.{{ .Relation.IdFieldGo }}, r.ID)
})
}
{{end}}{{end}} }
{{end}}

return nil
Expand All @@ -135,7 +155,9 @@ func (m *{{.GoName}}) Update(data map[string]string) error { {{range .Fields}}
func (m *{{.GoName}}) GetRelatedModelsAccessor() *RelatedModelsAccessor {
return &RelatedModelsAccessor{
m.GetFqids,
m.GetRelated,
m.SetRelated,
m.SetRelatedJSON,
m.Update,
}
}
4 changes: 3 additions & 1 deletion pkg/datastore/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type baseModelPtr[T any] interface {
type recursiveSubqueryList struct {
subquerys map[string]*recursiveSubqueryList
Fields []string
fqids []string
}

func (q *recursiveSubqueryList) With(idField string, fields []string) *recursiveSubqueryList {
Expand Down Expand Up @@ -165,7 +166,8 @@ func (q *query[T, PT]) resultStructs() (map[string]PT, error) {
}

func (q *query[T, PT]) recursiveLoadSubqueries(el *models.RelatedModelsAccessor, field string, subQuery *recursiveSubqueryList) error {
subDsResult, err := q.datastore.getFull(el.GetFqids(field))
subQuery.fqids = el.GetFqids(field)
subDsResult, err := q.datastore.getFull(subQuery.fqids)
if err != nil {
return err
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/datastore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,16 @@ func (ds *Datastore) setupRedisListener() {

for _, listener := range listeners {
listenerChanged := map[string]map[string]string{}
for _, fqid := range listener.fqids {
if val, ok := changeMap[fqid]; ok {
listenerChanged[fqid] = val
if len(listener.fqids) != 0 {
for _, fqid := range listener.fqids {
if val, ok := changeMap[fqid]; ok {
listenerChanged[fqid] = val
}
}
} else {
listenerChanged = changeMap
}

if len(listenerChanged) > 0 {
listener.channel <- listenerChanged
}
Expand Down
94 changes: 92 additions & 2 deletions pkg/datastore/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"maps"
"reflect"
"slices"
"strconv"
"strings"

"github.com/OpenSlides/openslides-projector-service/pkg/models"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -70,10 +73,18 @@ func (q *query[T, PT]) SubscribeOne(model PT) (*subscription[<-chan []string], e
fields: q.Fields,
channel: updateChannel,
}
if len(q.subquerys) != 0 {
listener.fqids = []string{}
}

q.datastore.change.AddListener <- &listener

load := func() error {
listener.fqids = q.fqids
if len(q.subquerys) != 0 {
listener.fqids = []string{}
} else {
listener.fqids = q.fqids
}

data, err := q.GetOne()
if err != nil {
Expand All @@ -98,6 +109,7 @@ func (q *query[T, PT]) SubscribeOne(model PT) (*subscription[<-chan []string], e
continue
}

updatedAny := false
if obj, ok := update[q.fqids[0]]; ok {
if obj == nil {
close(notifyChannel)
Expand All @@ -107,7 +119,22 @@ func (q *query[T, PT]) SubscribeOne(model PT) (*subscription[<-chan []string], e
if err := model.Update(obj); err != nil {
log.Error().Err(err).Msg("updating subscribed model failed")
}
notifyChannel <- slices.Collect(maps.Keys(obj))

updatedAny = true
}

for field, subQuery := range q.subquerys {
println(field)
update, err := q.recursiveUpdateSubqueries(model.GetRelatedModelsAccessor(), field, subQuery, update)
updatedAny = updatedAny || update
if err != nil {
log.Err(err).Msg("Could not update subscribed subqueries")
continue
}
}

if updatedAny {
notifyChannel <- slices.Collect(maps.Keys(update[q.fqids[0]]))
}
}

Expand All @@ -119,6 +146,69 @@ func (q *query[T, PT]) SubscribeOne(model PT) (*subscription[<-chan []string], e
}}, nil
}

func (q *query[T, PT]) recursiveUpdateSubqueries(el *models.RelatedModelsAccessor, field string, subQuery *recursiveSubqueryList, update map[string]map[string]string) (bool, error) {
subQuery.fqids = slices.DeleteFunc(subQuery.fqids, func(fqid string) bool {
return !slices.Contains(el.GetFqids(field), fqid)
})

added := []string{}
updatedAny := false
for _, fqid := range el.GetFqids(field) {
if !slices.Contains(subQuery.fqids, fqid) {
added = append(added, fqid)
} else {
p := strings.Split(fqid, "/")
id, err := strconv.Atoi(p[1])
if err != nil {
return false, err
}

model := el.GetRelated(field, id)
if model != nil {
if obj, ok := update[fqid]; ok {
updatedAny = true
err := model.Update(obj)
if err != nil {
return false, err
}
}

for sField, nextSubQuery := range subQuery.subquerys {
updated, err := q.recursiveUpdateSubqueries(model, sField, nextSubQuery, update)
updatedAny = updated || updatedAny
if err != nil {
return false, err
}
}
}
}
}

subDsResult, err := q.datastore.getFull(added)
if err != nil {
return false, err
}

if len(subDsResult) != 0 {
updatedAny = true
for _, dsResult := range subDsResult {
model, err := el.SetRelatedJSON(field, dsResult)
if err != nil {
log.Err(err).Msgf("Failed to parse JSON (%s)", string(dsResult))
} else if model != nil {
for sField, nextSubQuery := range subQuery.subquerys {
err := q.recursiveLoadSubqueries(model, sField, nextSubQuery)
if err != nil {
return false, err
}
}
}
}
}

return len(subDsResult) != 0 || updatedAny, nil
}

func (q *query[T, PT]) SubscribeField(field interface{}) (*subscription[<-chan struct{}], error) {
notifyChannel := make(chan struct{})
updateChannel := make(chan map[string]map[string]string)
Expand Down
6 changes: 6 additions & 0 deletions pkg/models/action_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ func (m *ActionWorker) CollectionName() string {
return "action_worker"
}

func (m *ActionWorker) GetRelated(field string, id int) *RelatedModelsAccessor {
return nil
}

func (m *ActionWorker) SetRelated(field string, content interface{}) {}

func (m *ActionWorker) SetRelatedJSON(field string, content []byte) (*RelatedModelsAccessor, error) {
Expand Down Expand Up @@ -105,7 +109,9 @@ func (m *ActionWorker) Update(data map[string]string) error {
func (m *ActionWorker) GetRelatedModelsAccessor() *RelatedModelsAccessor {
return &RelatedModelsAccessor{
m.GetFqids,
m.GetRelated,
m.SetRelated,
m.SetRelatedJSON,
m.Update,
}
}
52 changes: 52 additions & 0 deletions pkg/models/agenda_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package models
import (
"encoding/json"
"fmt"
"slices"
"strconv"
"strings"

Expand Down Expand Up @@ -87,6 +88,37 @@ func (m *AgendaItem) Tags() []*Tag {
return m.tags
}

func (m *AgendaItem) GetRelated(field string, id int) *RelatedModelsAccessor {
switch field {
case "child_ids":
for _, r := range m.childs {
if r.ID == id {
return r.GetRelatedModelsAccessor()
}
}
case "content_object_id":
return m.contentObject.GetRelatedModelsAccessor()
case "meeting_id":
return m.meeting.GetRelatedModelsAccessor()
case "parent_id":
return m.parent.GetRelatedModelsAccessor()
case "projection_ids":
for _, r := range m.projections {
if r.ID == id {
return r.GetRelatedModelsAccessor()
}
}
case "tag_ids":
for _, r := range m.tags {
if r.ID == id {
return r.GetRelatedModelsAccessor()
}
}
}

return nil
}

func (m *AgendaItem) SetRelated(field string, content interface{}) {
if content != nil {
switch field {
Expand Down Expand Up @@ -303,6 +335,12 @@ func (m *AgendaItem) Update(data map[string]string) error {
if err != nil {
return err
}

if _, ok := m.loadedRelations["child_ids"]; ok {
m.childs = slices.DeleteFunc(m.childs, func(r *AgendaItem) bool {
return !slices.Contains(m.ChildIDs, r.ID)
})
}
}

if val, ok := data["closed"]; ok {
Expand Down Expand Up @@ -387,13 +425,25 @@ func (m *AgendaItem) Update(data map[string]string) error {
if err != nil {
return err
}

if _, ok := m.loadedRelations["projection_ids"]; ok {
m.projections = slices.DeleteFunc(m.projections, func(r *Projection) bool {
return !slices.Contains(m.ProjectionIDs, r.ID)
})
}
}

if val, ok := data["tag_ids"]; ok {
err := json.Unmarshal([]byte(val), &m.TagIDs)
if err != nil {
return err
}

if _, ok := m.loadedRelations["tag_ids"]; ok {
m.tags = slices.DeleteFunc(m.tags, func(r *Tag) bool {
return !slices.Contains(m.TagIDs, r.ID)
})
}
}

if val, ok := data["type"]; ok {
Expand All @@ -416,7 +466,9 @@ func (m *AgendaItem) Update(data map[string]string) error {
func (m *AgendaItem) GetRelatedModelsAccessor() *RelatedModelsAccessor {
return &RelatedModelsAccessor{
m.GetFqids,
m.GetRelated,
m.SetRelated,
m.SetRelatedJSON,
m.Update,
}
}
Loading

0 comments on commit 240d3e5

Please sign in to comment.