Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update loaded relations within subscriptions #29

Merged
merged 3 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions cmd/modelgen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func parse(r io.Reader) ([]collection, error) {

relCollection := strings.Split(modelField.To, "/")[0]
relCollectionName := goName(relCollection)
relations = append(relations, relation{
rel := relation{
GoName: goName(collectionName),
PropName: propName,
PropNameLc: string(propNameLc),
Expand All @@ -127,7 +127,9 @@ func parse(r io.Reader) ([]collection, error) {
IdFieldGo: goName(fieldName),
Required: modelField.Required,
List: modelField.Type == "relation-list",
})
}
f.Relation = &rel
relations = append(relations, rel)
}

if modelField.Type == "generic-relation" {
Expand All @@ -140,7 +142,7 @@ func parse(r io.Reader) ([]collection, error) {
toCollections[c.Collection] = goName(c.Collection)
}

relations = append(relations, relation{
rel := relation{
GoName: goName(collectionName),
PropName: propName,
PropNameLc: string(propNameLc),
Expand All @@ -152,7 +154,9 @@ func parse(r io.Reader) ([]collection, error) {
List: false,
Generic: true,
ToCollections: toCollections,
})
}
f.Relation = &rel
relations = append(relations, rel)
}

fields = append(fields, f)
Expand Down Expand Up @@ -194,6 +198,7 @@ type field struct {
FieldName string
Required bool
SingleRelation bool
Relation *relation
}

type relation struct {
Expand Down
24 changes: 23 additions & 1 deletion cmd/modelgen/model_struct.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package models
import (
"encoding/json"
"fmt"
"slices"
"strconv"
"strings"

Expand All @@ -28,6 +29,19 @@ func (m *{{.GoName}}) CollectionName() string {
}

{{end}}
func (m *{{.GoName}}) GetRelated(field string, id int) *RelatedModelsAccessor { {{if .HasRelations}}
switch field { {{range .Relations}}
case "{{.IdField}}":{{if .List}}
for _, r := range m.{{.PropNameLc}} {
if r.ID == id {
return r.GetRelatedModelsAccessor()
}
}{{else}}
return m.{{.PropNameLc}}.GetRelatedModelsAccessor(){{end}}{{end}}
}
{{end}}
return nil}

func (m *{{.GoName}}) SetRelated(field string, content interface{}) { {{if .HasRelations}}
if content != nil {
switch field { {{range .Relations}}
Expand Down Expand Up @@ -126,7 +140,13 @@ func (m *{{.GoName}}) Update(data map[string]string) error { {{range .Fields}}
if err != nil {
return err
}
}
{{if .Relation}}{{if .Relation.List}}
if _, ok := m.loadedRelations["{{ .Relation.IdField }}"]; ok {
m.{{ .Relation.PropNameLc }} = slices.DeleteFunc(m.{{ .Relation.PropNameLc }}, func (r *{{ .Relation.CollectionName }}) bool {
return !slices.Contains(m.{{ .Relation.IdFieldGo }}, r.ID)
})
}
{{end}}{{end}} }
{{end}}

return nil
Expand All @@ -135,7 +155,9 @@ func (m *{{.GoName}}) Update(data map[string]string) error { {{range .Fields}}
func (m *{{.GoName}}) GetRelatedModelsAccessor() *RelatedModelsAccessor {
return &RelatedModelsAccessor{
m.GetFqids,
m.GetRelated,
m.SetRelated,
m.SetRelatedJSON,
m.Update,
}
}
4 changes: 3 additions & 1 deletion pkg/datastore/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type baseModelPtr[T any] interface {
type recursiveSubqueryList struct {
subquerys map[string]*recursiveSubqueryList
Fields []string
fqids []string
}

func (q *recursiveSubqueryList) With(idField string, fields []string) *recursiveSubqueryList {
Expand Down Expand Up @@ -165,7 +166,8 @@ func (q *query[T, PT]) resultStructs() (map[string]PT, error) {
}

func (q *query[T, PT]) recursiveLoadSubqueries(el *models.RelatedModelsAccessor, field string, subQuery *recursiveSubqueryList) error {
subDsResult, err := q.datastore.getFull(el.GetFqids(field))
subQuery.fqids = el.GetFqids(field)
subDsResult, err := q.datastore.getFull(subQuery.fqids)
if err != nil {
return err
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/datastore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,16 @@ func (ds *Datastore) setupRedisListener() {

for _, listener := range listeners {
listenerChanged := map[string]map[string]string{}
for _, fqid := range listener.fqids {
if val, ok := changeMap[fqid]; ok {
listenerChanged[fqid] = val
if len(listener.fqids) != 0 {
for _, fqid := range listener.fqids {
if val, ok := changeMap[fqid]; ok {
listenerChanged[fqid] = val
}
}
} else {
listenerChanged = changeMap
}

if len(listenerChanged) > 0 {
listener.channel <- listenerChanged
}
Expand Down
94 changes: 92 additions & 2 deletions pkg/datastore/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"maps"
"reflect"
"slices"
"strconv"
"strings"

"github.com/OpenSlides/openslides-projector-service/pkg/models"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -70,10 +73,18 @@ func (q *query[T, PT]) SubscribeOne(model PT) (*subscription[<-chan []string], e
fields: q.Fields,
channel: updateChannel,
}
if len(q.subquerys) != 0 {
listener.fqids = []string{}
}

q.datastore.change.AddListener <- &listener

load := func() error {
listener.fqids = q.fqids
if len(q.subquerys) != 0 {
listener.fqids = []string{}
} else {
listener.fqids = q.fqids
}

data, err := q.GetOne()
if err != nil {
Expand All @@ -98,6 +109,7 @@ func (q *query[T, PT]) SubscribeOne(model PT) (*subscription[<-chan []string], e
continue
}

updatedAny := false
if obj, ok := update[q.fqids[0]]; ok {
if obj == nil {
close(notifyChannel)
Expand All @@ -107,7 +119,22 @@ func (q *query[T, PT]) SubscribeOne(model PT) (*subscription[<-chan []string], e
if err := model.Update(obj); err != nil {
log.Error().Err(err).Msg("updating subscribed model failed")
}
notifyChannel <- slices.Collect(maps.Keys(obj))

updatedAny = true
}

for field, subQuery := range q.subquerys {
println(field)
update, err := q.recursiveUpdateSubqueries(model.GetRelatedModelsAccessor(), field, subQuery, update)
updatedAny = updatedAny || update
if err != nil {
log.Err(err).Msg("Could not update subscribed subqueries")
continue
}
}

if updatedAny {
notifyChannel <- slices.Collect(maps.Keys(update[q.fqids[0]]))
}
}

Expand All @@ -119,6 +146,69 @@ func (q *query[T, PT]) SubscribeOne(model PT) (*subscription[<-chan []string], e
}}, nil
}

func (q *query[T, PT]) recursiveUpdateSubqueries(el *models.RelatedModelsAccessor, field string, subQuery *recursiveSubqueryList, update map[string]map[string]string) (bool, error) {
subQuery.fqids = slices.DeleteFunc(subQuery.fqids, func(fqid string) bool {
return !slices.Contains(el.GetFqids(field), fqid)
})

added := []string{}
updatedAny := false
for _, fqid := range el.GetFqids(field) {
if !slices.Contains(subQuery.fqids, fqid) {
added = append(added, fqid)
} else {
p := strings.Split(fqid, "/")
id, err := strconv.Atoi(p[1])
if err != nil {
return false, err
}

model := el.GetRelated(field, id)
if model != nil {
if obj, ok := update[fqid]; ok {
updatedAny = true
err := model.Update(obj)
if err != nil {
return false, err
}
}

for sField, nextSubQuery := range subQuery.subquerys {
updated, err := q.recursiveUpdateSubqueries(model, sField, nextSubQuery, update)
updatedAny = updated || updatedAny
if err != nil {
return false, err
}
}
}
}
}

subDsResult, err := q.datastore.getFull(added)
if err != nil {
return false, err
}

if len(subDsResult) != 0 {
updatedAny = true
for _, dsResult := range subDsResult {
model, err := el.SetRelatedJSON(field, dsResult)
if err != nil {
log.Err(err).Msgf("Failed to parse JSON (%s)", string(dsResult))
} else if model != nil {
for sField, nextSubQuery := range subQuery.subquerys {
err := q.recursiveLoadSubqueries(model, sField, nextSubQuery)
if err != nil {
return false, err
}
}
}
}
}

return len(subDsResult) != 0 || updatedAny, nil
}

func (q *query[T, PT]) SubscribeField(field interface{}) (*subscription[<-chan struct{}], error) {
notifyChannel := make(chan struct{})
updateChannel := make(chan map[string]map[string]string)
Expand Down
6 changes: 6 additions & 0 deletions pkg/models/action_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ func (m *ActionWorker) CollectionName() string {
return "action_worker"
}

func (m *ActionWorker) GetRelated(field string, id int) *RelatedModelsAccessor {
return nil
}

func (m *ActionWorker) SetRelated(field string, content interface{}) {}

func (m *ActionWorker) SetRelatedJSON(field string, content []byte) (*RelatedModelsAccessor, error) {
Expand Down Expand Up @@ -105,7 +109,9 @@ func (m *ActionWorker) Update(data map[string]string) error {
func (m *ActionWorker) GetRelatedModelsAccessor() *RelatedModelsAccessor {
return &RelatedModelsAccessor{
m.GetFqids,
m.GetRelated,
m.SetRelated,
m.SetRelatedJSON,
m.Update,
}
}
52 changes: 52 additions & 0 deletions pkg/models/agenda_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package models
import (
"encoding/json"
"fmt"
"slices"
"strconv"
"strings"

Expand Down Expand Up @@ -87,6 +88,37 @@ func (m *AgendaItem) Tags() []*Tag {
return m.tags
}

func (m *AgendaItem) GetRelated(field string, id int) *RelatedModelsAccessor {
switch field {
case "child_ids":
for _, r := range m.childs {
if r.ID == id {
return r.GetRelatedModelsAccessor()
}
}
case "content_object_id":
return m.contentObject.GetRelatedModelsAccessor()
case "meeting_id":
return m.meeting.GetRelatedModelsAccessor()
case "parent_id":
return m.parent.GetRelatedModelsAccessor()
case "projection_ids":
for _, r := range m.projections {
if r.ID == id {
return r.GetRelatedModelsAccessor()
}
}
case "tag_ids":
for _, r := range m.tags {
if r.ID == id {
return r.GetRelatedModelsAccessor()
}
}
}

return nil
}

func (m *AgendaItem) SetRelated(field string, content interface{}) {
if content != nil {
switch field {
Expand Down Expand Up @@ -303,6 +335,12 @@ func (m *AgendaItem) Update(data map[string]string) error {
if err != nil {
return err
}

if _, ok := m.loadedRelations["child_ids"]; ok {
m.childs = slices.DeleteFunc(m.childs, func(r *AgendaItem) bool {
return !slices.Contains(m.ChildIDs, r.ID)
})
}
}

if val, ok := data["closed"]; ok {
Expand Down Expand Up @@ -387,13 +425,25 @@ func (m *AgendaItem) Update(data map[string]string) error {
if err != nil {
return err
}

if _, ok := m.loadedRelations["projection_ids"]; ok {
m.projections = slices.DeleteFunc(m.projections, func(r *Projection) bool {
return !slices.Contains(m.ProjectionIDs, r.ID)
})
}
}

if val, ok := data["tag_ids"]; ok {
err := json.Unmarshal([]byte(val), &m.TagIDs)
if err != nil {
return err
}

if _, ok := m.loadedRelations["tag_ids"]; ok {
m.tags = slices.DeleteFunc(m.tags, func(r *Tag) bool {
return !slices.Contains(m.TagIDs, r.ID)
})
}
}

if val, ok := data["type"]; ok {
Expand All @@ -416,7 +466,9 @@ func (m *AgendaItem) Update(data map[string]string) error {
func (m *AgendaItem) GetRelatedModelsAccessor() *RelatedModelsAccessor {
return &RelatedModelsAccessor{
m.GetFqids,
m.GetRelated,
m.SetRelated,
m.SetRelatedJSON,
m.Update,
}
}
Loading
Loading