Skip to content

Commit

Permalink
fix(server): reduce size of queries sent to MongoDB (#550)
Browse files Browse the repository at this point in the history
  • Loading branch information
rot1024 authored Jul 3, 2023
1 parent 466face commit b49f0ff
Show file tree
Hide file tree
Showing 28 changed files with 318 additions and 234 deletions.
18 changes: 9 additions & 9 deletions server/internal/infrastructure/mongo/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ func (r *Asset) paginate(ctx context.Context, filter any, sort *asset.SortType,
}
}

c := mongodoc.NewAssetConsumer()
pageInfo, err := r.client.Paginate(ctx, r.readFilter(filter), usort, pagination, c)
c := mongodoc.NewAssetConsumer(r.f.Readable)
pageInfo, err := r.client.Paginate(ctx, filter, usort, pagination, c)
if err != nil {
return nil, nil, rerror.ErrInternalByWithContext(ctx, err)
}
Expand All @@ -141,16 +141,16 @@ func (r *Asset) paginate(ctx context.Context, filter any, sort *asset.SortType,
}

func (r *Asset) find(ctx context.Context, filter any) ([]*asset.Asset, error) {
c := mongodoc.NewAssetConsumer()
if err2 := r.client.Find(ctx, r.readFilter(filter), c); err2 != nil {
c := mongodoc.NewAssetConsumer(r.f.Readable)
if err2 := r.client.Find(ctx, filter, c); err2 != nil {
return nil, rerror.ErrInternalByWithContext(ctx, err2)
}
return c.Result, nil
}

func (r *Asset) findOne(ctx context.Context, filter any) (*asset.Asset, error) {
c := mongodoc.NewAssetConsumer()
if err := r.client.FindOne(ctx, r.readFilter(filter), c); err != nil {
c := mongodoc.NewAssetConsumer(r.f.Readable)
if err := r.client.FindOne(ctx, filter, c); err != nil {
return nil, err
}
return c.Result[0], nil
Expand All @@ -171,9 +171,9 @@ func filterAssets(ids []id.AssetID, rows []*asset.Asset) []*asset.Asset {
return res
}

func (r *Asset) readFilter(filter any) any {
return applyWorkspaceFilter(filter, r.f.Readable)
}
// func (r *Asset) readFilter(filter any) any {
// return applyWorkspaceFilter(filter, r.f.Readable)
// }

func (r *Asset) writeFilter(filter any) any {
return applyWorkspaceFilter(filter, r.f.Writable)
Expand Down
19 changes: 11 additions & 8 deletions server/internal/infrastructure/mongo/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,15 @@ func (r *Dataset) FindBySchemaAllBy(ctx context.Context, s id.DatasetSchemaID, c
if err != nil {
return err
}
if r.f.Readable != nil && !r.f.Readable.Has(m.Scene()) {
return nil
}
return cb(m)
})

if err := r.client.Find(ctx, r.readFilter(bson.M{
if err := r.client.Find(ctx, bson.M{
"schema": s.String(),
}), c); err != nil {
}, c); err != nil {
return rerror.ErrInternalByWithContext(ctx, err)
}

Expand Down Expand Up @@ -337,24 +340,24 @@ func (r *Dataset) RemoveByScene(ctx context.Context, sceneID id.SceneID) error {
}

func (r *Dataset) find(ctx context.Context, dst dataset.List, filter interface{}) (dataset.List, error) {
c := mongodoc.NewDatasetConsumer()
if err2 := r.client.Find(ctx, r.readFilter(filter), c); err2 != nil {
c := mongodoc.NewDatasetConsumer(r.f.Readable)
if err2 := r.client.Find(ctx, filter, c); err2 != nil {
return nil, rerror.ErrInternalByWithContext(ctx, err2)
}
return c.Result, nil
}

func (r *Dataset) findOne(ctx context.Context, filter interface{}) (*dataset.Dataset, error) {
c := mongodoc.NewDatasetConsumer()
if err := r.client.FindOne(ctx, r.readFilter(filter), c); err != nil {
c := mongodoc.NewDatasetConsumer(r.f.Readable)
if err := r.client.FindOne(ctx, filter, c); err != nil {
return nil, err
}
return c.Result[0], nil
}

func (r *Dataset) paginate(ctx context.Context, filter bson.M, pagination *usecasex.Pagination) (dataset.List, *usecasex.PageInfo, error) {
c := mongodoc.NewDatasetConsumer()
pageInfo, err := r.client.Paginate(ctx, r.readFilter(filter), nil, pagination, c)
c := mongodoc.NewDatasetConsumer(r.f.Readable)
pageInfo, err := r.client.Paginate(ctx, filter, nil, pagination, c)
if err != nil {
return nil, nil, rerror.ErrInternalByWithContext(ctx, err)
}
Expand Down
27 changes: 16 additions & 11 deletions server/internal/infrastructure/mongo/dataset_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"go.mongodb.org/mongo-driver/bson"
"golang.org/x/exp/slices"

"github.com/reearth/reearth/server/internal/infrastructure/mongo/mongodoc"
"github.com/reearth/reearth/server/internal/usecase/repo"
Expand Down Expand Up @@ -93,9 +94,13 @@ func (r *DatasetSchema) FindBySceneAndSource(ctx context.Context, sceneID id.Sce
}

func (r *DatasetSchema) CountByScene(ctx context.Context, id id.SceneID) (int, error) {
res, err := r.client.Count(ctx, r.readFilter(bson.M{
if r.f.Readable != nil && !slices.Contains(r.f.Readable, id) {
return 0, nil
}

res, err := r.client.Count(ctx, bson.M{
"scene": id.String(),
}))
})
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -144,24 +149,24 @@ func (r *DatasetSchema) RemoveByScene(ctx context.Context, sceneID id.SceneID) e
}

func (r *DatasetSchema) find(ctx context.Context, filter any) ([]*dataset.Schema, error) {
c := mongodoc.NewDatasetSchemaConsumer()
if err := r.client.Find(ctx, r.readFilter(filter), c); err != nil {
c := mongodoc.NewDatasetSchemaConsumer(r.f.Readable)
if err := r.client.Find(ctx, filter, c); err != nil {
return nil, err
}
return c.Result, nil
}

func (r *DatasetSchema) findOne(ctx context.Context, filter any) (*dataset.Schema, error) {
c := mongodoc.NewDatasetSchemaConsumer()
if err := r.client.FindOne(ctx, r.readFilter(filter), c); err != nil {
c := mongodoc.NewDatasetSchemaConsumer(r.f.Readable)
if err := r.client.FindOne(ctx, filter, c); err != nil {
return nil, err
}
return c.Result[0], nil
}

func (r *DatasetSchema) paginate(ctx context.Context, filter bson.M, pagination *usecasex.Pagination) ([]*dataset.Schema, *usecasex.PageInfo, error) {
c := mongodoc.NewDatasetSchemaConsumer()
pageInfo, err := r.client.Paginate(ctx, r.readFilter(filter), nil, pagination, c)
c := mongodoc.NewDatasetSchemaConsumer(r.f.Readable)
pageInfo, err := r.client.Paginate(ctx, filter, nil, pagination, c)
if err != nil {
return nil, nil, rerror.ErrInternalByWithContext(ctx, err)
}
Expand All @@ -183,9 +188,9 @@ func filterDatasetSchemas(ids []id.DatasetSchemaID, rows []*dataset.Schema) []*d
return res
}

func (r *DatasetSchema) readFilter(filter any) any {
return applySceneFilter(filter, r.f.Readable)
}
// func (r *DatasetSchema) readFilter(filter any) any {
// return applySceneFilter(filter, r.f.Readable)
// }

func (r *DatasetSchema) writeFilter(filter any) any {
return applySceneFilter(filter, r.f.Writable)
Expand Down
30 changes: 15 additions & 15 deletions server/internal/infrastructure/mongo/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,16 +269,16 @@ func (r *Layer) RemoveByScene(ctx context.Context, sceneID id.SceneID) error {
}

func (r *Layer) find(ctx context.Context, dst layer.List, filter interface{}) (layer.List, error) {
c := mongodoc.NewLayerConsumer()
if err := r.client.Find(ctx, r.readFilter(filter), c); err != nil {
c := mongodoc.NewLayerConsumer(r.f.Readable)
if err := r.client.Find(ctx, filter, c); err != nil {
return nil, err
}
return lo.ToSlicePtr(c.Result), nil
}

func (r *Layer) findOne(ctx context.Context, filter interface{}) (layer.Layer, error) {
c := mongodoc.NewLayerConsumer()
if err := r.client.FindOne(ctx, r.readFilter(filter), c); err != nil {
c := mongodoc.NewLayerConsumer(r.f.Readable)
if err := r.client.FindOne(ctx, filter, c); err != nil {
return nil, err
}
if len(c.Result) == 0 {
Expand All @@ -288,8 +288,8 @@ func (r *Layer) findOne(ctx context.Context, filter interface{}) (layer.Layer, e
}

func (r *Layer) findItemOne(ctx context.Context, filter interface{}) (*layer.Item, error) {
c := mongodoc.NewLayerConsumer()
if err := r.client.FindOne(ctx, r.readFilter(filter), c); err != nil {
c := mongodoc.NewLayerConsumer(r.f.Readable)
if err := r.client.FindOne(ctx, filter, c); err != nil {
return nil, err
}
if len(c.Result) == 0 {
Expand All @@ -299,8 +299,8 @@ func (r *Layer) findItemOne(ctx context.Context, filter interface{}) (*layer.Ite
}

func (r *Layer) findGroupOne(ctx context.Context, filter interface{}) (*layer.Group, error) {
c := mongodoc.NewLayerConsumer()
if err := r.client.FindOne(ctx, r.readFilter(filter), c); err != nil {
c := mongodoc.NewLayerConsumer(r.f.Readable)
if err := r.client.FindOne(ctx, filter, c); err != nil {
return nil, err
}
if len(c.Result) == 0 {
Expand All @@ -310,16 +310,16 @@ func (r *Layer) findGroupOne(ctx context.Context, filter interface{}) (*layer.Gr
}

func (r *Layer) findItems(ctx context.Context, dst layer.ItemList, filter interface{}) (layer.ItemList, error) {
c := mongodoc.NewLayerConsumer()
if err := r.client.Find(ctx, r.readFilter(filter), c); err != nil {
c := mongodoc.NewLayerConsumer(r.f.Readable)
if err := r.client.Find(ctx, filter, c); err != nil {
return nil, err
}
return layer.List(lo.ToSlicePtr(c.Result)).ToLayerItemList(), nil
}

func (r *Layer) findGroups(ctx context.Context, dst layer.GroupList, filter interface{}) (layer.GroupList, error) {
c := mongodoc.NewLayerConsumer()
if err := r.client.Find(ctx, r.readFilter(filter), c); err != nil {
c := mongodoc.NewLayerConsumer(r.f.Readable)
if err := r.client.Find(ctx, filter, c); err != nil {
return nil, err
}
return layer.List(lo.ToSlicePtr(c.Result)).ToLayerGroupList(), nil
Expand Down Expand Up @@ -373,9 +373,9 @@ func filterLayerGroups(ids []id.LayerID, rows []*layer.Group) []*layer.Group {
return res
}

func (r *Layer) readFilter(filter interface{}) interface{} {
return applySceneFilter(filter, r.f.Readable)
}
// func (r *Layer) readFilter(filter interface{}) interface{} {
// return applySceneFilter(filter, r.f.Readable)
// }

func (r *Layer) writeFilter(filter interface{}) interface{} {
return applySceneFilter(filter, r.f.Writable)
Expand Down
10 changes: 6 additions & 4 deletions server/internal/infrastructure/mongo/mongodoc/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/reearth/reearth/server/pkg/asset"
"github.com/reearth/reearth/server/pkg/id"
"github.com/reearth/reearthx/mongox"
"golang.org/x/exp/slices"
)

type AssetDocument struct {
Expand All @@ -18,10 +18,12 @@ type AssetDocument struct {
ContentType string
}

type AssetConsumer = mongox.SliceFuncConsumer[*AssetDocument, *asset.Asset]
type AssetConsumer = Consumer[*AssetDocument, *asset.Asset]

func NewAssetConsumer() *AssetConsumer {
return NewComsumer[*AssetDocument, *asset.Asset]()
func NewAssetConsumer(workspaces []id.WorkspaceID) *AssetConsumer {
return NewConsumer[*AssetDocument, *asset.Asset](func(a *asset.Asset) bool {
return workspaces == nil || slices.Contains(workspaces, a.Workspace())
})
}

func NewAsset(asset *asset.Asset) (*AssetDocument, string) {
Expand Down
37 changes: 30 additions & 7 deletions server/internal/infrastructure/mongo/mongodoc/consumer.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,36 @@
package mongodoc

import "github.com/reearth/reearthx/mongox"

func NewComsumer[T Model[U], U any]() *mongox.SliceFuncConsumer[T, U] {
return mongox.NewSliceFuncConsumer(func(d T) (U, error) {
return d.Model()
})
}
import (
"github.com/reearth/reearthx/mongox"
"go.mongodb.org/mongo-driver/bson"
)

type Model[T any] interface {
Model() (T, error)
}

type Consumer[T, K any] struct {
Result []K
c mongox.SimpleConsumer[T]
}

func NewConsumer[T Model[U], U any](filter func(U) bool) *Consumer[T, U] {
var c *Consumer[T, U]
c = &Consumer[T, U]{
c: mongox.SimpleConsumer[T](func(d T) error {
e, err := d.Model()
if err != nil {
return err
}
if filter == nil || filter(e) {
c.Result = append(c.Result, e)
}
return nil
}),
}
return c
}

func (s *Consumer[T, K]) Consume(raw bson.Raw) error {
return s.c.Consume(raw)
}
9 changes: 6 additions & 3 deletions server/internal/infrastructure/mongo/mongodoc/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mongodoc

import (
"go.mongodb.org/mongo-driver/bson"
"golang.org/x/exp/slices"

"github.com/reearth/reearth/server/pkg/dataset"
"github.com/reearth/reearth/server/pkg/id"
Expand Down Expand Up @@ -30,10 +31,12 @@ type DatasetExtendedDocument struct {
Depth int
}

type DatasetConsumer = mongox.SliceFuncConsumer[*DatasetDocument, *dataset.Dataset]
type DatasetConsumer = Consumer[*DatasetDocument, *dataset.Dataset]

func NewDatasetConsumer() *DatasetConsumer {
return NewComsumer[*DatasetDocument, *dataset.Dataset]()
func NewDatasetConsumer(scenes []id.SceneID) *DatasetConsumer {
return NewConsumer[*DatasetDocument, *dataset.Dataset](func(a *dataset.Dataset) bool {
return scenes == nil || slices.Contains(scenes, a.Scene())
})
}

type DatasetMapConsumer struct {
Expand Down
10 changes: 6 additions & 4 deletions server/internal/infrastructure/mongo/mongodoc/dataset_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/reearth/reearth/server/pkg/dataset"
"github.com/reearth/reearth/server/pkg/id"
"github.com/reearth/reearth/server/pkg/scene"
"github.com/reearth/reearthx/mongox"
"golang.org/x/exp/slices"
)

type DatasetSchemaDocument struct {
Expand All @@ -23,10 +23,12 @@ type DatasetSchemaFieldDocument struct {
Source string
}

type DatasetSchemaConsumer = mongox.SliceFuncConsumer[*DatasetSchemaDocument, *dataset.Schema]
type DatasetSchemaConsumer = Consumer[*DatasetSchemaDocument, *dataset.Schema]

func NewDatasetSchemaConsumer() *DatasetSchemaConsumer {
return NewComsumer[*DatasetSchemaDocument, *dataset.Schema]()
func NewDatasetSchemaConsumer(scenes []id.SceneID) *DatasetSchemaConsumer {
return NewConsumer[*DatasetSchemaDocument, *dataset.Schema](func(a *dataset.Schema) bool {
return scenes == nil || slices.Contains(scenes, a.Scene())
})
}

func (d *DatasetSchemaDocument) Model() (*dataset.Schema, error) {
Expand Down
10 changes: 6 additions & 4 deletions server/internal/infrastructure/mongo/mongodoc/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/reearth/reearth/server/pkg/id"
"github.com/reearth/reearth/server/pkg/layer"
"github.com/reearth/reearth/server/pkg/scene"
"github.com/reearth/reearthx/mongox"
"golang.org/x/exp/slices"
)

type LayerDocument struct {
Expand Down Expand Up @@ -53,10 +53,12 @@ type LayerTagDocument struct {

type LayerTagListDocument []LayerTagDocument

type LayerConsumer = mongox.SliceFuncConsumer[*LayerDocument, layer.Layer]
type LayerConsumer = Consumer[*LayerDocument, layer.Layer]

func NewLayerConsumer() *LayerConsumer {
return NewComsumer[*LayerDocument, layer.Layer]()
func NewLayerConsumer(scenes []id.SceneID) *LayerConsumer {
return NewConsumer[*LayerDocument, layer.Layer](func(a layer.Layer) bool {
return scenes == nil || slices.Contains(scenes, a.Scene())
})
}

func NewLayer(l layer.Layer) (*LayerDocument, string) {
Expand Down
Loading

0 comments on commit b49f0ff

Please sign in to comment.