Skip to content

Commit

Permalink
Subscription updates
Browse files Browse the repository at this point in the history
  • Loading branch information
bastianjoel committed Jan 22, 2025
1 parent a897d78 commit 5feb91a
Show file tree
Hide file tree
Showing 52 changed files with 1,946 additions and 197 deletions.
15 changes: 15 additions & 0 deletions cmd/modelgen/model_struct.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ func (m *{{.GoName}}) CollectionName() string {
}

{{end}}
func (m *{{.GoName}}) GetRelated(field string, id int) *RelatedModelsAccessor { {{if .HasRelations}}
switch field { {{range .Relations}}
case "{{.IdField}}":{{if .List}}
for _, r := range m.{{.PropNameLc}} {
if r.ID == id {
return r.GetRelatedModelsAccessor()
}
}{{else}}
return m.{{.PropNameLc}}.GetRelatedModelsAccessor(){{end}}{{end}}
}
{{end}}
return nil}

func (m *{{.GoName}}) SetRelated(field string, content interface{}) { {{if .HasRelations}}
if content != nil {
switch field { {{range .Relations}}
Expand Down Expand Up @@ -142,7 +155,9 @@ func (m *{{.GoName}}) Update(data map[string]string) error { {{range .Fields}}
func (m *{{.GoName}}) GetRelatedModelsAccessor() *RelatedModelsAccessor {
return &RelatedModelsAccessor{
m.GetFqids,
m.GetRelated,
m.SetRelated,
m.SetRelatedJSON,
m.Update,
}
}
4 changes: 3 additions & 1 deletion pkg/datastore/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type baseModelPtr[T any] interface {
type recursiveSubqueryList struct {
subquerys map[string]*recursiveSubqueryList
Fields []string
fqids []string
}

func (q *recursiveSubqueryList) With(idField string, fields []string) *recursiveSubqueryList {
Expand Down Expand Up @@ -165,7 +166,8 @@ func (q *query[T, PT]) resultStructs() (map[string]PT, error) {
}

func (q *query[T, PT]) recursiveLoadSubqueries(el *models.RelatedModelsAccessor, field string, subQuery *recursiveSubqueryList) error {
subDsResult, err := q.datastore.getFull(el.GetFqids(field))
subQuery.fqids = el.GetFqids(field)
subDsResult, err := q.datastore.getFull(subQuery.fqids)
if err != nil {
return err
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/datastore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,16 @@ func (ds *Datastore) setupRedisListener() {

for _, listener := range listeners {
listenerChanged := map[string]map[string]string{}
for _, fqid := range listener.fqids {
if val, ok := changeMap[fqid]; ok {
listenerChanged[fqid] = val
if len(listener.fqids) != 0 {
for _, fqid := range listener.fqids {
if val, ok := changeMap[fqid]; ok {
listenerChanged[fqid] = val
}
}
} else {
listenerChanged = changeMap
}

if len(listenerChanged) > 0 {
listener.channel <- listenerChanged
}
Expand Down
94 changes: 92 additions & 2 deletions pkg/datastore/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"maps"
"reflect"
"slices"
"strconv"
"strings"

"github.com/OpenSlides/openslides-projector-service/pkg/models"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -70,10 +73,18 @@ func (q *query[T, PT]) SubscribeOne(model PT) (*subscription[<-chan []string], e
fields: q.Fields,
channel: updateChannel,
}
if len(q.subquerys) != 0 {
listener.fqids = []string{}
}

q.datastore.change.AddListener <- &listener

load := func() error {
listener.fqids = q.fqids
if len(q.subquerys) != 0 {
listener.fqids = []string{}
} else {
listener.fqids = q.fqids
}

data, err := q.GetOne()
if err != nil {
Expand All @@ -98,6 +109,7 @@ func (q *query[T, PT]) SubscribeOne(model PT) (*subscription[<-chan []string], e
continue
}

updatedAny := false
if obj, ok := update[q.fqids[0]]; ok {
if obj == nil {
close(notifyChannel)
Expand All @@ -107,7 +119,22 @@ func (q *query[T, PT]) SubscribeOne(model PT) (*subscription[<-chan []string], e
if err := model.Update(obj); err != nil {
log.Error().Err(err).Msg("updating subscribed model failed")
}
notifyChannel <- slices.Collect(maps.Keys(obj))

updatedAny = true
}

for field, subQuery := range q.subquerys {
println(field)
update, err := q.recursiveUpdateSubqueries(model.GetRelatedModelsAccessor(), field, subQuery, update)
updatedAny = updatedAny || update
if err != nil {
log.Err(err).Msg("Could not update subscribed subqueries")
continue
}
}

if updatedAny {
notifyChannel <- slices.Collect(maps.Keys(update[q.fqids[0]]))
}
}

Expand All @@ -119,6 +146,69 @@ func (q *query[T, PT]) SubscribeOne(model PT) (*subscription[<-chan []string], e
}}, nil
}

func (q *query[T, PT]) recursiveUpdateSubqueries(el *models.RelatedModelsAccessor, field string, subQuery *recursiveSubqueryList, update map[string]map[string]string) (bool, error) {
subQuery.fqids = slices.DeleteFunc(subQuery.fqids, func(fqid string) bool {
return !slices.Contains(el.GetFqids(field), fqid)
})

added := []string{}
updatedAny := false
for _, fqid := range el.GetFqids(field) {
if !slices.Contains(subQuery.fqids, fqid) {
added = append(added, fqid)
} else {
p := strings.Split(fqid, "/")
id, err := strconv.Atoi(p[1])
if err != nil {
return false, err
}

model := el.GetRelated(field, id)
if model != nil {
if obj, ok := update[fqid]; ok {
updatedAny = true
err := model.Update(obj)
if err != nil {
return false, err
}
}

for sField, nextSubQuery := range subQuery.subquerys {
updated, err := q.recursiveUpdateSubqueries(model, sField, nextSubQuery, update)
updatedAny = updated || updatedAny
if err != nil {
return false, err
}
}
}
}
}

subDsResult, err := q.datastore.getFull(added)
if err != nil {
return false, err
}

if len(subDsResult) != 0 {
updatedAny = true
for _, dsResult := range subDsResult {
model, err := el.SetRelatedJSON(field, dsResult)
if err != nil {
log.Err(err).Msgf("Failed to parse JSON (%s)", string(dsResult))
} else if model != nil {
for sField, nextSubQuery := range subQuery.subquerys {
err := q.recursiveLoadSubqueries(model, sField, nextSubQuery)
if err != nil {
return false, err
}
}
}
}
}

return len(subDsResult) != 0 || updatedAny, nil
}

func (q *query[T, PT]) SubscribeField(field interface{}) (*subscription[<-chan struct{}], error) {
notifyChannel := make(chan struct{})
updateChannel := make(chan map[string]map[string]string)
Expand Down
6 changes: 6 additions & 0 deletions pkg/models/action_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ func (m *ActionWorker) CollectionName() string {
return "action_worker"
}

func (m *ActionWorker) GetRelated(field string, id int) *RelatedModelsAccessor {
return nil
}

func (m *ActionWorker) SetRelated(field string, content interface{}) {}

func (m *ActionWorker) SetRelatedJSON(field string, content []byte) (*RelatedModelsAccessor, error) {
Expand Down Expand Up @@ -105,7 +109,9 @@ func (m *ActionWorker) Update(data map[string]string) error {
func (m *ActionWorker) GetRelatedModelsAccessor() *RelatedModelsAccessor {
return &RelatedModelsAccessor{
m.GetFqids,
m.GetRelated,
m.SetRelated,
m.SetRelatedJSON,
m.Update,
}
}
33 changes: 33 additions & 0 deletions pkg/models/agenda_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,37 @@ func (m *AgendaItem) Tags() []*Tag {
return m.tags
}

func (m *AgendaItem) GetRelated(field string, id int) *RelatedModelsAccessor {
switch field {
case "child_ids":
for _, r := range m.childs {
if r.ID == id {
return r.GetRelatedModelsAccessor()
}
}
case "content_object_id":
return m.contentObject.GetRelatedModelsAccessor()
case "meeting_id":
return m.meeting.GetRelatedModelsAccessor()
case "parent_id":
return m.parent.GetRelatedModelsAccessor()
case "projection_ids":
for _, r := range m.projections {
if r.ID == id {
return r.GetRelatedModelsAccessor()
}
}
case "tag_ids":
for _, r := range m.tags {
if r.ID == id {
return r.GetRelatedModelsAccessor()
}
}
}

return nil
}

func (m *AgendaItem) SetRelated(field string, content interface{}) {
if content != nil {
switch field {
Expand Down Expand Up @@ -435,7 +466,9 @@ func (m *AgendaItem) Update(data map[string]string) error {
func (m *AgendaItem) GetRelatedModelsAccessor() *RelatedModelsAccessor {
return &RelatedModelsAccessor{
m.GetFqids,
m.GetRelated,
m.SetRelated,
m.SetRelatedJSON,
m.Update,
}
}
45 changes: 45 additions & 0 deletions pkg/models/assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,49 @@ func (m *Assignment) Tags() []*Tag {
return m.tags
}

func (m *Assignment) GetRelated(field string, id int) *RelatedModelsAccessor {
switch field {
case "agenda_item_id":
return m.agendaItem.GetRelatedModelsAccessor()
case "attachment_meeting_mediafile_ids":
for _, r := range m.attachmentMeetingMediafiles {
if r.ID == id {
return r.GetRelatedModelsAccessor()
}
}
case "candidate_ids":
for _, r := range m.candidates {
if r.ID == id {
return r.GetRelatedModelsAccessor()
}
}
case "list_of_speakers_id":
return m.listOfSpeakers.GetRelatedModelsAccessor()
case "meeting_id":
return m.meeting.GetRelatedModelsAccessor()
case "poll_ids":
for _, r := range m.polls {
if r.ID == id {
return r.GetRelatedModelsAccessor()
}
}
case "projection_ids":
for _, r := range m.projections {
if r.ID == id {
return r.GetRelatedModelsAccessor()
}
}
case "tag_ids":
for _, r := range m.tags {
if r.ID == id {
return r.GetRelatedModelsAccessor()
}
}
}

return nil
}

func (m *Assignment) SetRelated(field string, content interface{}) {
if content != nil {
switch field {
Expand Down Expand Up @@ -468,7 +511,9 @@ func (m *Assignment) Update(data map[string]string) error {
func (m *Assignment) GetRelatedModelsAccessor() *RelatedModelsAccessor {
return &RelatedModelsAccessor{
m.GetFqids,
m.GetRelated,
m.SetRelated,
m.SetRelatedJSON,
m.Update,
}
}
15 changes: 15 additions & 0 deletions pkg/models/assignment_candidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ func (m *AssignmentCandidate) MeetingUser() *MeetingUser {
return m.meetingUser
}

func (m *AssignmentCandidate) GetRelated(field string, id int) *RelatedModelsAccessor {
switch field {
case "assignment_id":
return m.assignment.GetRelatedModelsAccessor()
case "meeting_id":
return m.meeting.GetRelatedModelsAccessor()
case "meeting_user_id":
return m.meetingUser.GetRelatedModelsAccessor()
}

return nil
}

func (m *AssignmentCandidate) SetRelated(field string, content interface{}) {
if content != nil {
switch field {
Expand Down Expand Up @@ -187,7 +200,9 @@ func (m *AssignmentCandidate) Update(data map[string]string) error {
func (m *AssignmentCandidate) GetRelatedModelsAccessor() *RelatedModelsAccessor {
return &RelatedModelsAccessor{
m.GetFqids,
m.GetRelated,
m.SetRelated,
m.SetRelatedJSON,
m.Update,
}
}
Loading

0 comments on commit 5feb91a

Please sign in to comment.