Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add draft MVP for handling events from feed #59

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,25 @@ go 1.23

toolchain go1.23.1

replace github.com/goverland-labs/goverland-core-web-api/protocol => ./protocol

require (
github.com/caarlos0/env/v10 v10.0.0
github.com/golang/protobuf v1.5.4
github.com/google/uuid v1.6.0
github.com/gorilla/handlers v1.5.1
github.com/gorilla/mux v1.8.0
github.com/goverland-labs/goverland-core-feed/protocol v0.1.1
github.com/goverland-labs/goverland-core-feed/protocol v0.1.2-0.20250117145742-cf29516364e7
github.com/goverland-labs/goverland-core-storage/protocol v0.4.1-0.20241219055229-dd1464b9178c
github.com/goverland-labs/goverland-core-web-api/protocol v0.0.0-00010101000000-000000000000
github.com/prometheus/client_golang v1.14.0
github.com/rs/zerolog v1.29.0
github.com/s-larionov/process-manager v0.0.1
github.com/shopspring/decimal v1.3.1
go.openly.dev/pointy v1.3.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142
google.golang.org/grpc v1.67.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f
google.golang.org/grpc v1.69.4
google.golang.org/protobuf v1.36.3
)

require (
Expand All @@ -31,8 +35,7 @@ require (
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
)
44 changes: 28 additions & 16 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
Expand Down Expand Up @@ -141,10 +145,8 @@ github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH
github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/goverland-labs/goverland-core-feed/protocol v0.1.1 h1:MCglgp6jgTwg28JFC5Qh2R2i49BkD4IIS3ro3a4WORI=
github.com/goverland-labs/goverland-core-feed/protocol v0.1.1/go.mod h1:fazb3ain351U8hH6XHC0/ThauiDvE6ehNWIyXtKjxg0=
github.com/goverland-labs/goverland-core-storage/protocol v0.4.0 h1:qMA4zrZmIFMe32nxC8bvwMZvcsSO4JJxlca6Hj9kHkk=
github.com/goverland-labs/goverland-core-storage/protocol v0.4.0/go.mod h1:GGKNmeR9GAoY//i7/kZaBVQHb8v7S8TmnLhhT77wETY=
github.com/goverland-labs/goverland-core-feed/protocol v0.1.2-0.20250117145742-cf29516364e7 h1:IjWiVrMfXuuFrK+XTB+PSjZi1D+AQHrfIZL1/53sSvQ=
github.com/goverland-labs/goverland-core-feed/protocol v0.1.2-0.20250117145742-cf29516364e7/go.mod h1:WPMgKlDpWHRH20A63NO9GtiznPgdNsD78wCOH7Pl+ow=
github.com/goverland-labs/goverland-core-storage/protocol v0.4.1-0.20241219055229-dd1464b9178c h1:cfJhkEqLHK28PHF3NpDIYnJAgo4cG5fBZRLZGVGH/Sk=
github.com/goverland-labs/goverland-core-storage/protocol v0.4.1-0.20241219055229-dd1464b9178c/go.mod h1:GGKNmeR9GAoY//i7/kZaBVQHb8v7S8TmnLhhT77wETY=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
Expand Down Expand Up @@ -244,6 +246,16 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.openly.dev/pointy v1.3.0 h1:keht3ObkbDNdY8PWPwB7Kcqk+MAlNStk5kXZTxukE68=
go.openly.dev/pointy v1.3.0/go.mod h1:rccSKiQDQ2QkNfSVT2KG8Budnfhf3At8IWxy/3ElYes=
go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY=
go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE=
go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE=
go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY=
go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk=
go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0=
go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc=
go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8=
go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys=
go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down Expand Up @@ -311,8 +323,8 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -369,8 +381,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -380,8 +392,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down Expand Up @@ -480,8 +492,8 @@ google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc
google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:e7S5W7MGGLaSu8j3YjdezkZ+m1/Nm0uRVRMEMGk26Xs=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:OxYkA3wjPsZyBylwymxSHa7ViiW1Sml4ToBrncvFehI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
Expand All @@ -494,8 +506,8 @@ google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKa
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.67.0 h1:IdH9y6PF5MPSdAntIcpjQ+tXO41pcQsfZV2RxtQgVcw=
google.golang.org/grpc v1.67.0/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A=
google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand All @@ -508,8 +520,8 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
63 changes: 63 additions & 0 deletions internal/grpc/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package grpc

import (
"fmt"
"time"

"github.com/goverland-labs/goverland-core-web-api/protocol/feedpb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type Server struct {
feedpb.UnimplementedFeedEventsServer

service *Service
}

func NewServer(sp *Service) *Server {
return &Server{
service: sp,
}
}

func (s *Server) EventsSubscribe(req *feedpb.EventsSubscribeRequest, stream grpc.ServerStreamingServer[feedpb.FeedItem]) error {
ctx := stream.Context()

var lastUpdated *time.Time
if req.GetLastUpdatedAt() != nil {
lu := req.GetLastUpdatedAt().AsTime()
lastUpdated = &lu
}

events := s.service.GetFeedItems(ctx, ItemsRequest{
SubscriberID: req.GetSubscriberId(),
SubscriptionTypes: req.GetSubscriptionTypes(),
LastUpdatedAt: lastUpdated,
})

for {
var (
event Result
ok bool
)

select {
case <-ctx.Done():
return fmt.Errorf("context done: %w", ctx.Err())
case event, ok = <-events:
if !ok {
return status.Error(codes.Canceled, "events channel closed")
}

if event.Err != nil {
return status.Errorf(codes.Internal, "internal error on getting data: %v", event.Err)
}
}

if err := stream.Send(event.Item); err != nil {
return fmt.Errorf("send event: %w", err)
}
}
}
184 changes: 184 additions & 0 deletions internal/grpc/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package grpc

import (
"context"
"errors"
"fmt"
"io"
"slices"
"time"

feedproto "github.com/goverland-labs/goverland-core-feed/protocol/feedpb"
internalproto "github.com/goverland-labs/goverland-core-web-api/protocol/feedpb"
"github.com/rs/zerolog/log"
"google.golang.org/protobuf/types/known/timestamppb"
)

type Service struct {
coreFeed feedproto.FeedEventsClient
}

type ItemsRequest struct {
SubscriberID string
SubscriptionTypes []internalproto.FeedItemType
LastUpdatedAt *time.Time
}

type FeedItem struct{}

type Result struct {
Item *internalproto.FeedItem
Err error
}

func NewService(fc feedproto.FeedEventsClient) *Service {
return &Service{
coreFeed: fc,
}
}

func (s *Service) GetFeedItems(ctx context.Context, req ItemsRequest) <-chan Result {
ch := make(chan Result, 100)
defer close(ch)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

go func() {
defer cancel()

var updatedAt *timestamppb.Timestamp
if req.LastUpdatedAt != nil {
updatedAt = timestamppb.New(*req.LastUpdatedAt)
}

stream, errC := s.coreFeed.EventsSubscribe(ctx, &feedproto.EventsSubscribeRequest{
SubscriberId: req.SubscriberID,
SubscriptionTypes: convertTypesToFeedProto(req.SubscriptionTypes),
LastUpdatedAt: updatedAt,
})
if errC != nil {
ch <- Result{Err: errC}
return
}

for {
in, err := stream.Recv()
if errors.Is(err, io.EOF) {
return
}

if err != nil {
ch <- Result{Err: fmt.Errorf("stream.Recv: %w", err)}
}

ch <- Result{Item: convertFeedToItem(in)}
}
}()

// todo: start getting data from core storage
if slices.Contains(req.SubscriptionTypes, internalproto.FeedItemType_FEED_ITEM_TYPE_VOTE) {
log.Error().Msg("implement getting data from core storage")
}

return ch
}

func convertTypesToFeedProto(list []internalproto.FeedItemType) []feedproto.FeedItemType {
res := make([]feedproto.FeedItemType, 0, len(list))
for _, item := range list {
res = append(res, feedproto.FeedItemType(item))
}

return res
}

func convertFeedToItem(in *feedproto.FeedItem) *internalproto.FeedItem {
if in == nil {
return nil
}

item := &internalproto.FeedItem{
CreatedAt: in.GetCreatedAt(),
UpdatedAt: in.GetUpdatedAt(),
Type: internalproto.FeedItemType(in.GetType()),
}

switch in.GetSnapshot().(type) {
case *feedproto.FeedItem_Dao:
item.Snapshot = convertFeedDao(in.GetSnapshot().(*feedproto.FeedItem_Dao))
case *feedproto.FeedItem_Proposal:
item.Snapshot = convertFeedProposal(in.GetSnapshot().(*feedproto.FeedItem_Proposal))
case *feedproto.FeedItem_Delegate:
item.Snapshot = convertFeedDelegate(in.GetSnapshot().(*feedproto.FeedItem_Delegate))
}

return item
}

func convertFeedDao(in *feedproto.FeedItem_Dao) *internalproto.FeedItem_Dao {
if in == nil || in.Dao == nil {
return nil
}

return &internalproto.FeedItem_Dao{
Dao: &internalproto.DAO{
CreatedAt: in.Dao.GetCreatedAt(),
UpdatedAt: in.Dao.GetUpdatedAt(),
InternalId: in.Dao.GetInternalId(),
OriginalId: in.Dao.GetOriginalId(),
Name: in.Dao.GetName(),
Avatar: in.Dao.GetAvatar(),
PopularityIndex: in.Dao.GetPopularityIndex(),
Verified: in.Dao.GetVerified(),
Timeline: convertTimeline(in.Dao.GetTimeline()),
},
}
}

func convertFeedProposal(in *feedproto.FeedItem_Proposal) *internalproto.FeedItem_Proposal {
if in == nil || in.Proposal == nil {
return nil
}

return &internalproto.FeedItem_Proposal{
Proposal: &internalproto.Proposal{
CreatedAt: in.Proposal.GetCreatedAt(),
UpdatedAt: in.Proposal.GetUpdatedAt(),
Id: in.Proposal.GetId(),
DaoInternalId: in.Proposal.GetDaoInternalId(),
Author: in.Proposal.GetAuthor(),
Title: in.Proposal.GetTitle(),
State: in.Proposal.GetState(),
Spam: in.Proposal.GetSpam(),
Timeline: convertTimeline(in.Proposal.GetTimeline()),
},
}
}

func convertFeedDelegate(in *feedproto.FeedItem_Delegate) *internalproto.FeedItem_Delegate {
if in == nil || in.Delegate == nil {
return nil
}

return &internalproto.FeedItem_Delegate{
Delegate: &internalproto.Delegate{
AddressFrom: in.Delegate.GetAddressFrom(),
AddressTo: in.Delegate.GetAddressTo(),
DaoInternalId: in.Delegate.GetDaoInternalId(),
ProposalId: in.Delegate.GetProposalId(),
},
}
}

func convertTimeline(in []*feedproto.Timeline) []*internalproto.Timeline {
result := make([]*internalproto.Timeline, 0, len(in))
for _, block := range in {
result = append(result, &internalproto.Timeline{
Action: block.GetAction(),
CreatedAt: block.GetCreatedAt(),
})
}

return result
}
Loading
Loading