diff --git a/Makefile b/Makefile index 811f052..9ca08a2 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,10 @@ up_build: build_app docker-compose up --build -d @echo "Docker images built and started!" +## restart: stops and then rebuilds and restarts docker-compose. +restart: down up_build + @echo "Restarting docker images..." + ## down: stop docker compose. down: @echo "Stopping docker compose..." diff --git a/docker-compose.yml b/docker-compose.yml index f1be56a..87e666d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,6 +21,34 @@ services: timeout: 2s retries: 5 + zookeeper: + image: 'wurstmeister/zookeeper:latest' + ports: + - 2181:2181 + restart: always + + kafka: + image: 'wurstmeister/kafka:latest' + ports: + - 9092:9092 + - 29092:29092 + restart: always + environment: + KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,INTERNAL://:9092 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + healthcheck: + test: [ "CMD", "kafka-topics.sh", "--list", "--zookeeper", "zookeeper:2181" ] + interval: 3s + timeout: 2s + retries: 10 + volumes: + - /etc/timezone:/etc/timezone:ro + - /etc/localtime:/etc/localtime:ro + api-app: build: context: . @@ -28,6 +56,8 @@ services: depends_on: postgres: condition: service_healthy + kafka: + condition: service_healthy ports: - "8080:8080" restart: always diff --git a/go.mod b/go.mod index be5ba22..b4ba7eb 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,9 @@ require ( github.com/golang/mock v1.6.0 github.com/jackc/pgx/v5 v5.4.3 github.com/kelseyhightower/envconfig v1.4.0 + github.com/pkg/errors v0.9.1 github.com/robfig/cron/v3 v3.0.1 + github.com/segmentio/kafka-go v0.4.47 github.com/stretchr/testify v1.9.0 github.com/tsenart/vegeta/v12 v12.11.1 golang.org/x/tools v0.22.0 @@ -25,8 +27,10 @@ require ( github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/josharian/intern v1.0.0 // indirect + github.com/klauspost/compress v1.15.9 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417 // indirect diff --git a/go.sum b/go.sum index 72e2614..569e833 100644 --- a/go.sum +++ b/go.sum @@ -31,13 +31,19 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= @@ -47,35 +53,58 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417 h1:Lt9DzQALzHoDwMBGJ6v8ObDPR0dzr2a6sXTB1Fq7IHs= github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA= +github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= +github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/streadway/quantile v0.0.0-20220407130108-4246515d968d h1:X4+kt6zM/OVO6gbJdAfJR60MGPsqCzbtXNnjoGqdfAs= github.com/streadway/quantile v0.0.0-20220407130108-4246515d968d/go.mod h1:lbP8tGiBjZ5YWIc2fzuRpTaz0b/53vT6PEs3QuAWzuU= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tsenart/vegeta/v12 v12.11.1 h1:Rbwe7Zxr7sJ+BDTReemeQalYPvKiSV+O7nwmUs20B3E= github.com/tsenart/vegeta/v12 v12.11.1/go.mod h1:swiFmrgpqj2llHURgHYFRFN0tfrIrlnspg01HjwOnSQ= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 h1:MGwJjxBy0HJshjDNfLsYO8xppfqWlA5ZT9OhtUUhTNw= golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0= golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -83,15 +112,32 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.22.0 h1:gqSGLZqv+AI9lIQzniJ0nZDRG5GBPsSi+DRNHWNz6yA= golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/app/config/config.go b/internal/app/config/config.go index dc104f4..121efed 100644 --- a/internal/app/config/config.go +++ b/internal/app/config/config.go @@ -1,15 +1,15 @@ package config import ( - "github.com/vladyslavpavlenko/genesis-api-project/internal/email" + "github.com/vladyslavpavlenko/genesis-api-project/internal/notifier" ) // AppConfig holds the application config. type AppConfig struct { - EmailConfig email.Config + Notifier notifier.Notifier } // NewAppConfig creates a new AppConfig. func NewAppConfig() *AppConfig { - return &AppConfig{EmailConfig: email.Config{}} + return &AppConfig{} } diff --git a/internal/app/run.go b/internal/app/run.go index 58335d8..8f4ecfc 100644 --- a/internal/app/run.go +++ b/internal/app/run.go @@ -11,12 +11,17 @@ import ( "syscall" "time" + "github.com/vladyslavpavlenko/genesis-api-project/internal/notifier" + schedulerpkg "github.com/vladyslavpavlenko/genesis-api-project/pkg/scheduler" + + producerpkg "github.com/vladyslavpavlenko/genesis-api-project/internal/outbox/producer" + + consumerpkg "github.com/vladyslavpavlenko/genesis-api-project/internal/email/consumer" + "github.com/robfig/cron/v3" "github.com/vladyslavpavlenko/genesis-api-project/internal/app/config" - "github.com/vladyslavpavlenko/genesis-api-project/internal/handlers" "github.com/vladyslavpavlenko/genesis-api-project/internal/handlers/routes" - schedulerpkg "github.com/vladyslavpavlenko/genesis-api-project/internal/scheduler" ) const ( @@ -24,78 +29,112 @@ const ( schedule = "0 10 * * *" ) -// scheduler defines an interface for scheduling tasks. +// scheduler is an interface for task scheduling. type scheduler interface { - ScheduleTask(schedule string, task func()) (cron.EntryID, error) + Schedule(schedule string, task func()) (cron.EntryID, error) Start() Stop() } -// db defines an interface for the database. -type db interface { - Connect(dsn string) error - Close() error - Migrate() error +// consumer is an interface for event consumption. +type consumer interface { + Consume(ctx context.Context) +} + +// producer is an interface for event producing. +type producer interface { + NewTopic(topic string, partitions int, replicationFactor int) error + SetTopic(topic string) + Produce(ctx context.Context, frequency time.Duration, topic string, partition int) } -// Run initializes the application, sets up the database, schedules email tasks, and starts the -// HTTP server with graceful shutdown. +// Run is the application running process. func Run(appConfig *config.AppConfig) error { - dbConn, err := setup(appConfig) + appServices, err := setup(appConfig) if err != nil { return err } - defer func() { - if closeErr := dbConn.Close(); closeErr != nil { - log.Printf("Error closing the database connection: %v\n", closeErr) - } - }() + defer appServices.DBConn.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() s := schedulerpkg.NewCronScheduler() - err = scheduleEmails(s) - if err != nil { + if err = scheduleEmails(s, appServices.Notifier); err != nil { return fmt.Errorf("failed to schedule emails: %w", err) } s.Start() defer s.Stop() - log.Printf("Running on port %d", webPort) + kafkaURL := os.Getenv("KAFKA_URL") + kafkaTopic := "emails-topic" + + kafkaProducer, err := producerpkg.NewKafkaProducer(kafkaURL, appServices.Outbox, appServices.DBConn) + if err != nil { + return fmt.Errorf("failed to create kafka producer: %w", err) + } + defer kafkaProducer.Writer.Close() + + err = kafkaProducer.NewTopic(kafkaTopic, 1, 1) + if err != nil { + return fmt.Errorf("failed to create topic: %w", err) + } + kafkaProducer.SetTopic(kafkaTopic) + go eventProducer(ctx, kafkaProducer, kafkaTopic, 1) + + kafkaGroupID := "emails-group" + + kafkaConsumer, err := consumerpkg.NewKafkaConsumer( + kafkaURL, + kafkaTopic, + 0, + kafkaGroupID, + appServices.Sender, + appServices.DBConn) + if err != nil { + return fmt.Errorf("failed to create kafka consumer: %w", err) + } + defer kafkaConsumer.Reader.Close() + go eventConsumer(ctx, kafkaConsumer) + log.Printf("Running on port %d", webPort) srv := &http.Server{ Addr: fmt.Sprintf(":%d", webPort), Handler: routes.Routes(), ReadHeaderTimeout: 5 * time.Second, } - // Graceful shutdown + handleShutdown(srv, cancel) + + return nil +} + +// handleShutdown handles a graceful shutdown of the application. +func handleShutdown(srv *http.Server, cancelFunc context.CancelFunc) { stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt, syscall.SIGTERM) go func() { - if err = srv.ListenAndServe(); err != nil && !errors.Is(http.ErrServerClosed, err) { - log.Fatalf("HTTP server ListenAndServe: %v", err) + <-stop + cancelFunc() // Cancel context to shut down dispatcher + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + log.Println("Shutting down server...") + if err := srv.Shutdown(ctx); err != nil { + log.Printf("HTTP server shutdown failed: %v", err) } + log.Println("Server has been stopped") }() - // Block until a signal is received - <-stop - - // Set a deadline - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - log.Println("Shutting down...") - if err = srv.Shutdown(ctx); err != nil { - return fmt.Errorf("server shutdown failed: %v", err) + if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { + log.Fatalf("HTTP server ListenAndServe: %v", err) } - - log.Println("Server has been stopped") - return nil } -// scheduleEmails uses the provided Scheduler to set up the mailing function. -func scheduleEmails(s scheduler) error { - _, err := s.ScheduleTask(schedule, func() { - err := handlers.Repo.NotifySubscribers() +// scheduleEmails sets up a mailing process. +func scheduleEmails(s scheduler, n *notifier.Notifier) error { + _, err := s.Schedule(schedule, func() { + err := n.Start() if err != nil { log.Printf("Error notifying subscribers: %v", err) } @@ -106,3 +145,21 @@ func scheduleEmails(s scheduler) error { return nil } + +// eventProducer runs an event dispatcher. +func eventProducer(ctx context.Context, producer producer, topic string, partition int) { + producer.Produce(ctx, 10*time.Second, topic, partition) + + // Wait for context cancellation to handle graceful shutdown + <-ctx.Done() + log.Println("Shutting down event producer...") +} + +// eventConsumer runs an event dispatcher. +func eventConsumer(ctx context.Context, c consumer) { + go c.Consume(ctx) + + // Wait for context cancellation to handle graceful shutdown + <-ctx.Done() + log.Println("Shutting down event consumer...") +} diff --git a/internal/app/setup.go b/internal/app/setup.go index d35099f..1738251 100644 --- a/internal/app/setup.go +++ b/internal/app/setup.go @@ -1,14 +1,19 @@ package app import ( - "errors" "fmt" "log" "net/http" - "github.com/vladyslavpavlenko/genesis-api-project/internal/dbrepo" + notifierpkg "github.com/vladyslavpavlenko/genesis-api-project/internal/notifier" + "github.com/vladyslavpavlenko/genesis-api-project/internal/outbox/gormoutbox" + producerpkg "github.com/vladyslavpavlenko/genesis-api-project/internal/outbox/producer" + + outboxpkg "github.com/vladyslavpavlenko/genesis-api-project/internal/outbox" "github.com/vladyslavpavlenko/genesis-api-project/internal/app/config" + "github.com/vladyslavpavlenko/genesis-api-project/internal/models" + "github.com/vladyslavpavlenko/genesis-api-project/internal/storage/gormrepo" "github.com/vladyslavpavlenko/genesis-api-project/internal/rateapi" "github.com/vladyslavpavlenko/genesis-api-project/internal/rateapi/chain" @@ -19,20 +24,26 @@ import ( "github.com/vladyslavpavlenko/genesis-api-project/internal/handlers" ) -type ( - // envVariables holds environment variables used in the application. - envVariables struct { - DBURL string `envconfig:"DB_URL"` - DBPort string `envconfig:"DB_PORT"` - DBUser string `envconfig:"DB_USER"` - DBPass string `envconfig:"DB_PASS"` - DBName string `envconfig:"DB_NAME"` - EmailAddr string `envconfig:"EMAIL_ADDR"` - EmailPass string `envconfig:"EMAIL_PASS"` - } -) +// envVariables holds environment variables used in the application. +type envVariables struct { + DBURL string `envconfig:"DB_URL"` + DBPort string `envconfig:"DB_PORT"` + DBUser string `envconfig:"DB_USER"` + DBPass string `envconfig:"DB_PASS"` + DBName string `envconfig:"DB_NAME"` + EmailAddr string `envconfig:"EMAIL_ADDR"` + EmailPass string `envconfig:"EMAIL_PASS"` +} -func setup(app *config.AppConfig) (db, error) { +type services struct { + DBConn *gormrepo.Connection + Sender *email.GomailSender + Fetcher *chain.Node + Notifier *notifierpkg.Notifier + Outbox producerpkg.Outbox +} + +func setup(app *config.AppConfig) (*services, error) { envs, err := readEnv() if err != nil { return nil, fmt.Errorf("error reading the .env file: %w", err) @@ -55,17 +66,32 @@ func setup(app *config.AppConfig) (db, error) { return nil, fmt.Errorf("error runnning database migrations: %w", err) } - app.EmailConfig, err = email.NewEmailConfig(envs.EmailAddr, envs.EmailPass) + fetcher := setupFetchersChain(&http.Client{}) + + sender, err := setupSender(&envs) + if err != nil { + return nil, fmt.Errorf("failed set up sender: %w", err) + } + + outbox, err := gormoutbox.NewOutbox(dbConn) if err != nil { - return nil, errors.New("error setting up email configuration") + return nil, fmt.Errorf("failed to create outbox: %w", err) } - services := setupServices(&envs, dbConn, &http.Client{}) + notifier := notifierpkg.NewNotifier(dbConn, fetcher, outbox) - repo := handlers.NewRepo(app, services) + repo := handlers.NewRepo(app, &handlers.Services{ + Fetcher: fetcher, + Notifier: notifier, + }, dbConn) handlers.NewHandlers(repo) - return dbConn, nil + return &services{ + DBConn: dbConn, + Sender: sender, + Fetcher: fetcher, + Outbox: outbox, + }, nil } // readEnv reads and returns the environmental variables as an envVariables object. @@ -79,22 +105,22 @@ func readEnv() (envVariables, error) { } // connectDB sets up a GORM database connection and returns an interface. -func connectDB(dsn string) (*dbrepo.GormDB, error) { - var db dbrepo.GormDB +func connectDB(dsn string) (*gormrepo.Connection, error) { + var conn gormrepo.Connection - err := db.Connect(dsn) + err := conn.Setup(dsn) if err != nil { return nil, err } - return &db, nil + return &conn, nil } // migrateDB runs database migrations. -func migrateDB(db *dbrepo.GormDB) error { +func migrateDB(conn *gormrepo.Connection) error { log.Println("Running migrations...") - err := db.Migrate() + err := conn.Migrate(&models.Subscription{}, &outboxpkg.Event{}) if err != nil { return fmt.Errorf("error running migrations: %w", err) } @@ -104,22 +130,20 @@ func migrateDB(db *dbrepo.GormDB) error { return nil } -// setupServices sets up handlers.Services. -func setupServices(envs *envVariables, dbConn *dbrepo.GormDB, client *http.Client) *handlers.Services { - fetcher := setupFetchersChain(client) - subscriber := dbrepo.NewSubscriptionRepository(dbConn) - sender := &email.GomailSender{ - Dialer: gomail.NewDialer("smtp.gmail.com", 587, envs.EmailAddr, envs.EmailPass), +// setupSender sets up a Sender service. +func setupSender(envs *envVariables) (sender *email.GomailSender, err error) { + emailConfig, err := email.NewEmailConfig(envs.EmailAddr, envs.EmailPass) + if err != nil { + return nil, fmt.Errorf("error creating email config: %w", err) } - return &handlers.Services{ - Subscriber: subscriber, - Fetcher: fetcher, - Sender: sender, - } + return &email.GomailSender{ + Dialer: gomail.NewDialer("smtp.gmail.com", 587, envs.EmailAddr, envs.EmailPass), + Config: emailConfig, + }, nil } -// setupServices sets up a chain of responsibility for fetchers. +// setupFetchersChain sets up a chain of responsibility for fetchers. func setupFetchersChain(client *http.Client) *chain.Node { coinbaseFetcher := rateapi.NewFetcherWithLogger("coinbase", rateapi.NewCoinbaseFetcher(client)) diff --git a/internal/dbrepo/gorm_driver.go b/internal/dbrepo/gorm_driver.go deleted file mode 100644 index 49c45ba..0000000 --- a/internal/dbrepo/gorm_driver.go +++ /dev/null @@ -1,66 +0,0 @@ -package dbrepo - -import ( - "fmt" - "log" - "time" - - "github.com/vladyslavpavlenko/genesis-api-project/internal/models" - - "gorm.io/driver/postgres" - "gorm.io/gorm" -) - -type GormDB struct { - DB *gorm.DB -} - -// Connect implements the DB interface for GormDB. -func (g *GormDB) Connect(dsn string) error { - var counts int64 - for { - db, err := openDB(dsn) - if err != nil { - log.Printf("Postgres not yet ready... Attempt: %d\n", counts) - counts++ - } else { - log.Println("Connected to Postgres!") - g.DB = db - return nil - } - - if counts > 10 { - log.Println("Maximum retry attempts exceeded:", err) - return err - } - - log.Println("Backing off for two seconds...") - time.Sleep(2 * time.Second) - } -} - -// openDB initializes a new database connection. -func openDB(dsn string) (*gorm.DB, error) { - db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) - if err != nil { - return nil, err - } - return db, nil -} - -func (g *GormDB) Close() error { - sqlDB, err := g.DB.DB() - if err != nil { - return err - } - return sqlDB.Close() -} - -func (g *GormDB) Migrate() error { - err := g.DB.AutoMigrate(&models.Subscription{}) - if err != nil { - return fmt.Errorf("error during migration: %w", err) - } - - return nil -} diff --git a/internal/dbrepo/gorm_subscriber.go b/internal/dbrepo/gorm_subscriber.go deleted file mode 100644 index 4cfec84..0000000 --- a/internal/dbrepo/gorm_subscriber.go +++ /dev/null @@ -1,57 +0,0 @@ -package dbrepo - -import ( - "errors" - "time" - - "github.com/vladyslavpavlenko/genesis-api-project/internal/email" - - "github.com/vladyslavpavlenko/genesis-api-project/internal/models" - - "github.com/jackc/pgx/v5/pgconn" -) - -var ErrDuplicateSubscription = errors.New("subscription already exists") - -// SubscriberService is a models.Subscription repository. -type SubscriberService struct { - *GormDB -} - -// NewSubscriptionRepository creates a new GormSubscriptionRepository. -func NewSubscriptionRepository(db *GormDB) *SubscriberService { - return &SubscriberService{db} -} - -// AddSubscription creates a new Subscription record. -func (s *SubscriberService) AddSubscription(emailAddr string) error { - if !email.Email(emailAddr).Validate() { - return errors.New("invalid email") - } - subscription := models.Subscription{ - Email: emailAddr, - CreatedAt: time.Now(), - } - result := s.DB.Create(&subscription) - if result.Error != nil { - var pgErr *pgconn.PgError - if errors.As(result.Error, &pgErr) && pgErr.Code == "23505" { - return ErrDuplicateSubscription - } - return result.Error - } - return nil -} - -// GetSubscriptions returns a paginated list of subscriptions. Limit specify the number of records to be retrieved -// Limit conditions can be canceled by using `Limit(-1)`. Offset specify the number of records to skip before starting -// to return the records. Offset conditions can be canceled by using `Offset(-1)`. -func (s *SubscriberService) GetSubscriptions(limit, offset int) ([]models.Subscription, error) { - var subscriptions []models.Subscription - result := s.DB.Limit(limit).Offset(offset).Find(&subscriptions) - if result.Error != nil { - return nil, result.Error - } - - return subscriptions, nil -} diff --git a/internal/email/consumer/consumed_event.go b/internal/email/consumer/consumed_event.go new file mode 100644 index 0000000..8db0f43 --- /dev/null +++ b/internal/email/consumer/consumed_event.go @@ -0,0 +1,16 @@ +package consumer + +import ( + "time" + + "github.com/vladyslavpavlenko/genesis-api-project/internal/outbox" +) + +// ConsumedEvent represents an event consumed by the consumer. +type ConsumedEvent struct { + ID uint `gorm:"not null;index"` + Event outbox.Event `gorm:"foreignKey:ID" json:"-"` + Data string + ConsumedAt time.Time + UpdatedAt time.Time +} diff --git a/internal/email/consumer/kafka_consumer.go b/internal/email/consumer/kafka_consumer.go new file mode 100644 index 0000000..1a59281 --- /dev/null +++ b/internal/email/consumer/kafka_consumer.go @@ -0,0 +1,115 @@ +package consumer + +import ( + "context" + "fmt" + "log" + "strconv" + "time" + + "github.com/pkg/errors" + + "github.com/vladyslavpavlenko/genesis-api-project/internal/outbox" + + "github.com/vladyslavpavlenko/genesis-api-project/internal/email" + + "github.com/segmentio/kafka-go" +) + +type Sender interface { + Send(params email.Params) error +} + +type dbConnection interface { + Migrate(models ...any) error + AddConsumedEvent(event ConsumedEvent) error +} + +type KafkaConsumer struct { + db dbConnection + Reader *kafka.Reader + Sender Sender +} + +// NewKafkaConsumer initializes a new KafkaConsumer. +func NewKafkaConsumer(kafkaURL, topic string, partition int, groupID string, sender Sender, db dbConnection) (*KafkaConsumer, error) { + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{kafkaURL}, + Topic: topic, + Partition: partition, + GroupID: groupID, + CommitInterval: 0, // disable auto-commit + }) + + err := db.Migrate(&ConsumedEvent{}) + if err != nil { + return nil, errors.Wrap(err, "failed to migrate offset") + } + + return &KafkaConsumer{Reader: reader, Sender: sender, db: db}, nil +} + +// Consume is a worker that consumes messages from Kafka and processes them +// to send an email using the Sender interface. +func (c *KafkaConsumer) Consume(ctx context.Context) { + for { + // Attempt to fetch a message from Kafka + m, err := c.Reader.FetchMessage(ctx) + if err != nil { + log.Printf("Failed to read message: %v", err) + continue + } + + // Attempt to deserialize the fetched message + data, err := outbox.DeserializeData(m.Value) + if err != nil { + log.Printf("Failed to deserialize data from message at offset %d: %v", m.Offset, err) + continue + } + + // Send a message + c.sendMessage(data) + + // Create a record of the consumed event + keyString := string(m.Key) + eventID, err := strconv.ParseUint(keyString, 10, 64) + if err != nil { + log.Printf("Failed to parse event ID from key: %v", err) + continue + } + + consumedEvent := ConsumedEvent{ + ID: uint(eventID), + Data: data.Serialize(), + ConsumedAt: time.Now(), + } + + // Attempt to add the consumed event to the database + if err = c.db.AddConsumedEvent(consumedEvent); err != nil { + log.Printf("Failed to record consumed event at offset %d: %v", m.Offset, err) + continue + } + + // Commit the offset back to Kafka to mark the message as processed + if err = c.Reader.CommitMessages(ctx, m); err != nil { + log.Printf("Failed to commit message offset %d: %v", m.Offset, err) + } else { + log.Printf("Offset committed successfully for message at offset: %d", m.Offset) + } + } +} + +func (c *KafkaConsumer) sendMessage(data outbox.Data) { + params := email.Params{ + To: data.Email, + Subject: "USD to UAH Exchange Rate", + Body: fmt.Sprintf("The current exchange rate for USD to UAH is %.2f.", data.Rate), + } + + log.Printf("Sending email to %s", data.Email) + + err := c.Sender.Send(params) + if err != nil { + log.Printf("Failed to send email: %v", err) + } +} diff --git a/internal/email/gomail_sender.go b/internal/email/gomail_sender.go index 1f4a2b6..3b2e191 100644 --- a/internal/email/gomail_sender.go +++ b/internal/email/gomail_sender.go @@ -7,6 +7,7 @@ import ( // GomailSender implements the Sender interface for Gomail. type GomailSender struct { Dialer Dialer + Config Config } type GomailDialer struct { @@ -17,9 +18,9 @@ func (d *GomailDialer) DialAndSend(m ...*gomail.Message) error { return d.Dialer.DialAndSend(m...) } -func (gs *GomailSender) Send(cfg Config, params Params) error { +func (gs *GomailSender) Send(params Params) error { m := gomail.NewMessage() - m.SetHeader("From", cfg.Email) + m.SetHeader("From", gs.Config.Email) m.SetHeader("To", params.To) m.SetHeader("Subject", params.Subject) m.SetBody("text/plain", params.Body) diff --git a/internal/email/gomail_sender_test.go b/internal/email/gomail_sender_test.go index 4afe8d6..f5fd4c3 100644 --- a/internal/email/gomail_sender_test.go +++ b/internal/email/gomail_sender_test.go @@ -17,12 +17,11 @@ func TestSend(t *testing.T) { mockDialer := mocks.NewMockDialer(ctrl) gomailSender := email.GomailSender{Dialer: mockDialer} - config := email.Config{Email: "test@example.com", Password: "password"} params := email.Params{To: "recipient@example.com", Subject: "Test", Body: "Hello"} mockDialer.EXPECT().DialAndSend(gomock.Any()).Return(nil) - err := gomailSender.Send(config, params) + err := gomailSender.Send(params) if err != nil { t.Errorf("Send failed: %v", err) } @@ -35,13 +34,12 @@ func TestGomailSenderSendFailure(t *testing.T) { mockDialer := mocks.NewMockDialer(ctrl) sender := email.GomailSender{Dialer: mockDialer} - cfg := email.Config{Email: "test@example.com", Password: "password"} params := email.Params{To: "recipient@example.com", Subject: "Failure Test", Body: "This email should encounter a send error."} testError := errors.New("smtp error") mockDialer.EXPECT().DialAndSend(gomock.Any()).Return(testError) - err := sender.Send(cfg, params) + err := sender.Send(params) assert.Equal(t, testError, err, "Expected a specific error, but got a different one") } diff --git a/internal/email/sender_test.go b/internal/email/sender_test.go deleted file mode 100644 index 0583bcf..0000000 --- a/internal/email/sender_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package email_test - -// -// import ( -// "errors" -// "sync" -// "testing" -// -// "github.com/vladyslavpavlenko/genesis-api-project/internal/email" -//) -// -// func TestSendEmail_InvalidEmail(_ *testing.T) { -// var wg sync.WaitGroup -// wg.Add(1) -// -// cfg := email.Config{ -// Email: "invalidemail", -// Password: "password", -// } -// -// params := email.Params{ -// To: "recipient@example.com", -// Subject: "Test Subject", -// Body: "Test Body", -// } -// -// mockSender := MockEmailSender{ -// SendFunc: func(_ email.Config, _ email.Params) error { -// return errors.New("invalid email address") -// }, -// } -// -// go email.SendEmail(mockSender, cfg, params) -// wg.Wait() -// } diff --git a/internal/handlers/handlers.go b/internal/handlers/handlers.go index 96e01ae..b707d4a 100644 --- a/internal/handlers/handlers.go +++ b/internal/handlers/handlers.go @@ -5,6 +5,8 @@ import ( "fmt" "net/http" + emailpkg "github.com/vladyslavpavlenko/genesis-api-project/internal/email" + "github.com/vladyslavpavlenko/genesis-api-project/pkg/jsonutils" ) @@ -38,49 +40,73 @@ func (m *Repository) GetRate(w http.ResponseWriter, r *http.Request) { _ = jsonutils.WriteJSON(w, http.StatusOK, payload) } -// subscriptionBody is the email subscription request body structure. -type subscriptionBody struct { - Email string `json:"email"` +// parseEmailFromRequest parses the email from the multipart form and validates it. +func parseEmailFromRequest(r *http.Request) (string, error) { + err := r.ParseMultipartForm(10 << 20) + if err != nil { + return "", errors.New("failed to parse form") + } + + emailAddr := r.FormValue("email") + if emailAddr == "" { + return "", errors.New("email is required") + } + + if !emailpkg.Email(emailAddr).Validate() { + return "", errors.New("invalid email") + } + + return emailAddr, nil } // Subscribe handles the `/subscribe` request. func (m *Repository) Subscribe(w http.ResponseWriter, r *http.Request) { - // Parse the form - var body subscriptionBody + email, err := parseEmailFromRequest(r) + if err != nil { + _ = jsonutils.ErrorJSON(w, err) + return + } - err := r.ParseMultipartForm(10 << 20) + err = m.DB.AddSubscription(email) if err != nil { - _ = jsonutils.ErrorJSON(w, errors.New("failed to parse form")) + _ = jsonutils.ErrorJSON(w, err, http.StatusInternalServerError) return } - body.Email = r.FormValue("email") - if body.Email == "" { - _ = jsonutils.ErrorJSON(w, errors.New("email is required")) + payload := jsonutils.Response{ + Error: false, + Message: "subscribed", + } + + _ = jsonutils.WriteJSON(w, http.StatusOK, payload) +} + +// Unsubscribe handles the `/unsubscribe` request. +func (m *Repository) Unsubscribe(w http.ResponseWriter, r *http.Request) { + email, err := parseEmailFromRequest(r) + if err != nil { + _ = jsonutils.ErrorJSON(w, err) return } - // Perform the subscription operation - err = m.Services.Subscriber.AddSubscription(body.Email) + err = m.DB.DeleteSubscription(email) if err != nil { _ = jsonutils.ErrorJSON(w, err, http.StatusInternalServerError) return } - // AddSubscription a response payload := jsonutils.Response{ Error: false, - Message: "subscribed", + Message: "unsubscribed", } - // Send the response back _ = jsonutils.WriteJSON(w, http.StatusOK, payload) } // SendEmails handles the `/sendEmails` request. func (m *Repository) SendEmails(w http.ResponseWriter, _ *http.Request) { - // Perform the mailing operation - err := m.NotifySubscribers() + // Produce mailing events + err := m.Services.Notifier.Start() if err != nil { _ = jsonutils.ErrorJSON(w, err, http.StatusInternalServerError) return @@ -88,8 +114,7 @@ func (m *Repository) SendEmails(w http.ResponseWriter, _ *http.Request) { // AddSubscription a response payload := jsonutils.Response{ - Error: false, - Message: "sent", + Error: false, } // Send the response back diff --git a/internal/handlers/notifier.go b/internal/handlers/notifier.go deleted file mode 100644 index 301c49f..0000000 --- a/internal/handlers/notifier.go +++ /dev/null @@ -1,89 +0,0 @@ -package handlers - -import ( - "context" - "fmt" - "log" - "strconv" - "sync" - - "github.com/vladyslavpavlenko/genesis-api-project/internal/models" - - "github.com/vladyslavpavlenko/genesis-api-project/internal/email" -) - -const batchSize = 100 - -type ( - // Sender defines an interface for sending emails. - Sender interface { - Send(emailConfig email.Config, params email.Params) error - } - - // Fetcher interface defines an interface for fetching rates. - Fetcher interface { - Fetch(ctx context.Context, base, target string) (string, error) - } - - // Subscriber interface defines methods to access models.Subscription data. - Subscriber interface { - AddSubscription(string) error - GetSubscriptions(limit, offset int) ([]models.Subscription, error) - } -) - -// NotifySubscribers handles sending currency update emails to all the subscribers in batches. -func (m *Repository) NotifySubscribers() error { - var offset int - for { - subscriptions, err := m.Services.Subscriber.GetSubscriptions(batchSize, offset) - if err != nil { - return err - } - if len(subscriptions) == 0 { - break - } - - var wg sync.WaitGroup - for _, subscription := range subscriptions { - wg.Add(1) - go func(sub models.Subscription) { - defer wg.Done() - if err = m.sendEmail(sub); err != nil { - log.Println(err) - } - }(subscription) - } - wg.Wait() - - offset += batchSize - } - - return nil -} - -// sendEmail is a controller function to prepare and send emails -func (m *Repository) sendEmail(subscription models.Subscription) error { - price, err := m.Services.Fetcher.Fetch(context.Background(), "USD", "UAH") - if err != nil { - return fmt.Errorf("failed to retrieve rate: %w", err) - } - - floatPrice, err := strconv.ParseFloat(price, 32) - if err != nil { - return fmt.Errorf("failed to parse price: %w", err) - } - - params := email.Params{ - To: subscription.Email, - Subject: "USD to UAH Exchange Rate", - Body: fmt.Sprintf("The current exchange rate for USD to UAH is %.2f.", floatPrice), - } - - err = m.Services.Sender.Send(m.App.EmailConfig, params) - if err != nil { - return fmt.Errorf("failed to send email: %w", err) - } - - return nil -} diff --git a/internal/handlers/notifier_test.go b/internal/handlers/notifier_test.go deleted file mode 100644 index 9b63a64..0000000 --- a/internal/handlers/notifier_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package handlers_test - -import ( - "testing" - - "github.com/vladyslavpavlenko/genesis-api-project/internal/app/config" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/vladyslavpavlenko/genesis-api-project/internal/handlers" - "github.com/vladyslavpavlenko/genesis-api-project/internal/models" -) - -func TestNotifySubscribers_Success(t *testing.T) { - mockSubscriber := new(MockSubscriber) - mockFetcher := new(MockFetcher) - mockSender := new(MockSender) - appConfig := &config.AppConfig{} - services := setupServicesWithMocks(mockSubscriber, mockFetcher, mockSender) - repo := handlers.NewRepo(appConfig, services) - - subscribers := []models.Subscription{{Email: "user@example.com"}} - mockSubscriber.On("GetSubscriptions").Return(subscribers, nil) - mockFetcher.On("Fetch", mock.Anything, "USD", "UAH").Return("24.5", nil) - mockSender.On("Send", mock.AnythingOfType("email.Config"), mock.AnythingOfType("email.Params")).Return(nil) - - err := repo.NotifySubscribers() - - assert.NoError(t, err) - - mockSubscriber.AssertExpectations(t) - mockFetcher.AssertExpectations(t) - mockSender.AssertExpectations(t) -} diff --git a/internal/handlers/repository.go b/internal/handlers/repository.go index c2d3dd8..9dbdc11 100644 --- a/internal/handlers/repository.go +++ b/internal/handlers/repository.go @@ -2,30 +2,41 @@ package handlers import ( "github.com/vladyslavpavlenko/genesis-api-project/internal/app/config" + "github.com/vladyslavpavlenko/genesis-api-project/internal/models" + "github.com/vladyslavpavlenko/genesis-api-project/internal/notifier" + "github.com/vladyslavpavlenko/genesis-api-project/internal/rateapi" ) type ( // Services is the repository type. Services struct { - Subscriber Subscriber - Fetcher Fetcher - Sender Sender + Fetcher rateapi.Fetcher + Notifier *notifier.Notifier } // Repository is the repository type Repository struct { App *config.AppConfig + DB dbConnection Services *Services } + + // dbConnection defines an interface for the database connection. + dbConnection interface { + AddSubscription(emailAddr string) error + DeleteSubscription(emailAddr string) error + GetSubscriptions(limit, offset int) ([]models.Subscription, error) + } ) // Repo the repository used by the handlers var Repo *Repository // NewRepo creates a new Repository -func NewRepo(a *config.AppConfig, services *Services) *Repository { +func NewRepo(a *config.AppConfig, services *Services, conn dbConnection) *Repository { return &Repository{ App: a, + DB: conn, Services: services, } } diff --git a/internal/handlers/repository_test.go b/internal/handlers/repository_test.go index a5e7854..f0fc821 100644 --- a/internal/handlers/repository_test.go +++ b/internal/handlers/repository_test.go @@ -4,6 +4,8 @@ import ( "context" "testing" + "github.com/vladyslavpavlenko/genesis-api-project/internal/storage/gormrepo" + "github.com/stretchr/testify/mock" "github.com/vladyslavpavlenko/genesis-api-project/internal/email" "github.com/vladyslavpavlenko/genesis-api-project/internal/models" @@ -23,16 +25,21 @@ func (m *MockSender) Send(cfg email.Config, params email.Params) error { return args.Error(0) } -type MockSubscriber struct { +type MockDB struct { mock.Mock } -func (m *MockSubscriber) GetSubscriptions(_, _ int) ([]models.Subscription, error) { +func (m *MockDB) GetSubscriptions(_, _ int) ([]models.Subscription, error) { args := m.Called() return args.Get(0).([]models.Subscription), args.Error(1) } -func (m *MockSubscriber) AddSubscription(emailAddr string) error { +func (m *MockDB) AddSubscription(emailAddr string) error { + args := m.Called(emailAddr) + return args.Error(0) +} + +func (m *MockDB) DeleteSubscription(emailAddr string) error { args := m.Called(emailAddr) return args.Error(0) } @@ -46,24 +53,15 @@ func (m *MockFetcher) Fetch(ctx context.Context, base, target string) (string, e return args.String(0), args.Error(1) } -func setupServicesWithMocks(subscriber *MockSubscriber, fetcher *MockFetcher, sender *MockSender) *handlers.Services { - return &handlers.Services{ - Subscriber: subscriber, - Fetcher: fetcher, - Sender: sender, - } -} - // TestNewRepo tests the creation of a new repository func TestNewRepo(t *testing.T) { appConfig := &config.AppConfig{} services := &handlers.Services{ - Subscriber: &MockSubscriber{}, - Fetcher: &MockFetcher{}, - Sender: &MockSender{}, + Fetcher: &MockFetcher{}, } + dbConn := &gormrepo.Connection{} - repo := handlers.NewRepo(appConfig, services) + repo := handlers.NewRepo(appConfig, services, dbConn) assert.NotNil(t, repo) assert.Equal(t, appConfig, repo.App) @@ -74,12 +72,11 @@ func TestNewRepo(t *testing.T) { func TestNewHandlers(t *testing.T) { appConfig := &config.AppConfig{} services := &handlers.Services{ - Subscriber: &MockSubscriber{}, - Fetcher: &MockFetcher{}, - Sender: &MockSender{}, + Fetcher: &MockFetcher{}, } + dbConn := &gormrepo.Connection{} - repo := handlers.NewRepo(appConfig, services) + repo := handlers.NewRepo(appConfig, services, dbConn) handlers.NewHandlers(repo) assert.Equal(t, repo, handlers.Repo) diff --git a/internal/handlers/routes/routes.go b/internal/handlers/routes/routes.go index d6b5c28..0fcd615 100644 --- a/internal/handlers/routes/routes.go +++ b/internal/handlers/routes/routes.go @@ -18,6 +18,7 @@ func Routes() http.Handler { mux.Route("/v1", func(mux chi.Router) { mux.Get("/rate", handlers.Repo.GetRate) mux.Post("/subscribe", handlers.Repo.Subscribe) + mux.Post("/unsubscribe", handlers.Repo.Unsubscribe) mux.Post("/sendEmails", handlers.Repo.SendEmails) }) }) diff --git a/internal/notifier/notifier.go b/internal/notifier/notifier.go new file mode 100644 index 0000000..a12ae35 --- /dev/null +++ b/internal/notifier/notifier.go @@ -0,0 +1,97 @@ +package notifier + +import ( + "context" + "fmt" + "strconv" + "sync" + + "github.com/vladyslavpavlenko/genesis-api-project/internal/models" + outboxpkg "github.com/vladyslavpavlenko/genesis-api-project/internal/outbox" +) + +const batchSize = 100 + +// dbConnection defines an interface for the database connection. +type dbConnection interface { + GetSubscriptions(limit, offset int) ([]models.Subscription, error) +} + +// fetcher defines an interface for the fetching data rates. +type fetcher interface { + Fetch(ctx context.Context, base, target string) (string, error) +} + +// outbox defines an interface for writing events to the outbox. +type outbox interface { + AddEvent(data outboxpkg.Data) error +} + +type Notifier struct { + DB dbConnection + Fetcher fetcher + Outbox outbox +} + +// NewNotifier creates a new Notifier. +func NewNotifier(db dbConnection, f fetcher, o outbox) *Notifier { + return &Notifier{ + DB: db, + Fetcher: f, + Outbox: o, + } +} + +// Start handles producing events for currency rate update emails. +func (n *Notifier) Start() error { + rate, err := n.Fetcher.Fetch(context.Background(), "USD", "UAH") + if err != nil { + return fmt.Errorf("failed to retrieve rate: %w", err) + } + + floatRate, err := strconv.ParseFloat(rate, 64) + if err != nil { + return fmt.Errorf("failed to parse rate: %w", err) + } + + var offset int + errChan := make(chan error, 1) + for { + subscriptions, err := n.DB.GetSubscriptions(batchSize, offset) + if err != nil { + return err + } + if len(subscriptions) == 0 { + break + } + + var wg sync.WaitGroup + for _, sub := range subscriptions { + wg.Add(1) + go func(sub models.Subscription) { + defer wg.Done() + + data := outboxpkg.Data{ + Email: sub.Email, + Rate: floatRate, + } + if localErr := n.Outbox.AddEvent(data); localErr != nil { + select { + case errChan <- localErr: + default: + } + } + }(sub) + } + wg.Wait() + + select { + case err := <-errChan: + return err + default: + } + + offset += batchSize + } + return nil +} diff --git a/internal/outbox/event.go b/internal/outbox/event.go new file mode 100644 index 0000000..a8b90ff --- /dev/null +++ b/internal/outbox/event.go @@ -0,0 +1,40 @@ +package outbox + +import ( + "encoding/json" + "log" + "time" +) + +// Event is a query message model stored in the database. +type Event struct { + ID uint `gorm:"primaryKey"` + Data string + CreatedAt time.Time +} + +// Data is an event data model. +type Data struct { + Email string `json:"email"` + Rate float64 `json:"rate"` +} + +// Serialize takes a Data struct and serializes it to a JSON string. +func (d Data) Serialize() string { + bytes, err := json.Marshal(d) + if err != nil { + log.Println(err) + return "" + } + return string(bytes) +} + +// DeserializeData deserializes JSON string to Data struct +func DeserializeData(jsonData []byte) (Data, error) { + var data Data + err := json.Unmarshal(jsonData, &data) + if err != nil { + return Data{}, err + } + return data, nil +} diff --git a/internal/outbox/gormoutbox/outbox.go b/internal/outbox/gormoutbox/outbox.go new file mode 100644 index 0000000..b83556e --- /dev/null +++ b/internal/outbox/gormoutbox/outbox.go @@ -0,0 +1,41 @@ +package gormoutbox + +import ( + "time" + + "github.com/vladyslavpavlenko/genesis-api-project/internal/outbox" + + "github.com/pkg/errors" +) + +// dbConnection defines an interface for the database connection. +type dbConnection interface { + Migrate(models ...any) error + AddEvent(event *outbox.Event) error +} + +// Outbox defines an interface for the transactional outbox. +type Outbox struct { + db dbConnection +} + +// NewOutbox creates `events` table to implement a transactional outbox. +// `events` table stores all the events ever occurred. +func NewOutbox(db dbConnection) (*Outbox, error) { + err := db.Migrate(&outbox.Event{}) + if err != nil { + return nil, errors.Wrap(err, "failed to migrate events") + } + return &Outbox{db: db}, nil +} + +// AddEvent creates a new Event record. +func (o *Outbox) AddEvent(data outbox.Data) error { + event := &outbox.Event{ + CreatedAt: time.Now(), + } + + event.Data = data.Serialize() + + return o.db.AddEvent(event) +} diff --git a/internal/outbox/producer/kafka_producer.go b/internal/outbox/producer/kafka_producer.go new file mode 100644 index 0000000..49f1632 --- /dev/null +++ b/internal/outbox/producer/kafka_producer.go @@ -0,0 +1,169 @@ +package producer + +import ( + "context" + "log" + "strconv" + "time" + + "github.com/pkg/errors" + "github.com/vladyslavpavlenko/genesis-api-project/internal/outbox" + + "gorm.io/gorm" + + "github.com/segmentio/kafka-go" +) + +type Outbox interface { + AddEvent(data outbox.Data) error +} + +type dbConnection interface { + Migrate(models ...any) error + BeginTransaction() (*gorm.DB, error) + GetLastOffset(topic string, partition int) (Offset, error) + FetchUnpublishedEvents(lastOffset uint) ([]outbox.Event, error) + UpdateOffset(offset *Offset) error +} + +type KafkaProducer struct { + db dbConnection + Writer *kafka.Writer + Outbox Outbox +} + +// NewKafkaProducer initializes a new KafkaProducer. +func NewKafkaProducer(kafkaURL string, o Outbox, db dbConnection) (*KafkaProducer, error) { + w := &kafka.Writer{ + Addr: kafka.TCP(kafkaURL), + Balancer: &kafka.LeastBytes{}, + AllowAutoTopicCreation: true, + } + + err := db.Migrate(&Offset{}) + if err != nil { + return nil, errors.Wrap(err, "failed to migrate offset") + } + + return &KafkaProducer{Writer: w, Outbox: o, db: db}, nil +} + +// NewTopic creates a new kafka.TopicConfig if it does not exist. +func (p *KafkaProducer) NewTopic(topic string, partitions, replicationFactor int) error { + conn, err := kafka.Dial("tcp", p.Writer.Addr.String()) + if err != nil { + return err + } + defer conn.Close() + + topicConfig := kafka.TopicConfig{ + Topic: topic, + NumPartitions: partitions, + ReplicationFactor: replicationFactor, + } + + err = conn.CreateTopics(topicConfig) + if err != nil { + return err + } + return nil +} + +// SetTopic changes the topic of the current kafka.Writer +func (p *KafkaProducer) SetTopic(topic string) { + p.Writer.Topic = topic +} + +// Produce fetches for unpublished events, publishes them, and marks them as published. +func (p *KafkaProducer) Produce(ctx context.Context, frequency time.Duration, topic string, partition int) { + ticker := time.NewTicker(frequency) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + log.Println("Shutting down worker...") + return + case <-ticker.C: + p.processEvents(ctx, topic, partition) + } + } +} + +func (p *KafkaProducer) processEvents(ctx context.Context, topic string, partition int) { + // Start a transaction + tx, err := p.db.BeginTransaction() + if err != nil { + log.Printf("Failed to start transaction: %v", err) + return + } + log.Println("Transaction started successfully") + + defer func() { + if r := recover(); r != nil { + tx.Rollback() + log.Printf("Recovered from panic: %v, transaction rolled back", r) + } + }() + + // Retrieve the last processed offset + lastOffset, err := p.db.GetLastOffset(topic, partition) + if err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { + tx.Rollback() + log.Printf("Failed to fetch last offset: %v", err) + return + } + + lastOffset = Offset{ + Topic: topic, + Partition: partition, + Offset: 0, + } + } else { + log.Printf("Last offset fetched: %d", lastOffset.Offset) + } + + // Fetch unpublished events based on the last offset + events, err := p.db.FetchUnpublishedEvents(lastOffset.Offset) + if err != nil { + tx.Rollback() + log.Printf("Failed to fetch unpublished events: %v", err) + return + } + log.Printf("Fetched %d unpublished events", len(events)) + + // Process each event + for _, event := range events { + key := []byte(strconv.Itoa(int(event.ID))) + msg := &kafka.Message{ + Key: key, + Value: []byte(event.Data), + Partition: partition, + } + + log.Printf("Preparing to send message with ID: %d", event.ID) + + if err = p.Writer.WriteMessages(ctx, *msg); err != nil { + tx.Rollback() + log.Printf("Failed to send Kafka message: %v", err) + return + } + log.Printf("Message with ID %d sent successfully", event.ID) + + lastOffset.Offset = event.ID + if err = p.db.UpdateOffset(&lastOffset); err != nil { + tx.Rollback() + log.Printf("Failed to update offset after sending message with ID %d: %v", event.ID, err) + return + } + log.Printf("Offset updated successfully for message ID: %d", event.ID) + } + + // Commit the transaction if all events are processed successfully + if err = tx.Commit().Error; err != nil { + log.Printf("Failed to commit transaction: %v", err) + return + } + log.Println("Transaction committed and all events processed successfully") +} diff --git a/internal/outbox/producer/offset.go b/internal/outbox/producer/offset.go new file mode 100644 index 0000000..0bd0d71 --- /dev/null +++ b/internal/outbox/producer/offset.go @@ -0,0 +1,8 @@ +package producer + +// Offset represents the last published event offset for a topic and partition. +type Offset struct { + Topic string `gorm:"primaryKey"` + Partition int `gorm:"primaryKey"` + Offset uint +} diff --git a/internal/storage/gormrepo/consumed_event.go b/internal/storage/gormrepo/consumed_event.go new file mode 100644 index 0000000..eb5fca8 --- /dev/null +++ b/internal/storage/gormrepo/consumed_event.go @@ -0,0 +1,19 @@ +package gormrepo + +import ( + "context" + + "github.com/vladyslavpavlenko/genesis-api-project/internal/email/consumer" +) + +// AddConsumedEvent creates a new consumer.ConsumedEvent record. +func (c *Connection) AddConsumedEvent(event consumer.ConsumedEvent) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + result := c.db.WithContext(ctx).Create(event) + if result.Error != nil { + return result.Error + } + return nil +} diff --git a/internal/storage/gormrepo/driver.go b/internal/storage/gormrepo/driver.go new file mode 100644 index 0000000..55f8130 --- /dev/null +++ b/internal/storage/gormrepo/driver.go @@ -0,0 +1,89 @@ +package gormrepo + +import ( + "context" + "fmt" + "log" + "time" + + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +const timeout = time.Second * 5 + +type Connection struct { + db *gorm.DB +} + +// DB returns a pointer to gorm.DB. +func (c *Connection) DB() *gorm.DB { + return c.db +} + +// Setup sets up a new Connection. +func (c *Connection) Setup(dsn string) error { + var counts int64 + for { + db, err := openDB(dsn) + if err != nil { + log.Printf("Postgres not yet ready... Attempt: %d\n", counts) + counts++ + } else { + log.Println("Connected to Postgres!") + c.db = db + return nil + } + + if counts > 10 { + log.Println("Maximum retry attempts exceeded:", err) + return err + } + + log.Println("Backing off for two seconds...") + time.Sleep(2 * time.Second) + } +} + +// openDB initializes a new gorm.DB database connection. +func openDB(dsn string) (*gorm.DB, error) { + db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) + if err != nil { + return nil, err + } + return db, nil +} + +// Close closes a database connection. +func (c *Connection) Close() error { + sqlDB, err := c.db.DB() + if err != nil { + return err + } + return sqlDB.Close() +} + +// Migrate performs a database migration for given models. +func (c *Connection) Migrate(models ...any) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + err := c.db.WithContext(ctx).AutoMigrate(models...) + if err != nil { + return fmt.Errorf("error migrating models: %w", err) + } + + return nil +} + +// BeginTransaction begins a transaction. +func (c *Connection) BeginTransaction() (*gorm.DB, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + tx := c.db.WithContext(ctx).Begin() + if tx.Error != nil { + return nil, tx.Error + } + return tx, nil +} diff --git a/internal/storage/gormrepo/event.go b/internal/storage/gormrepo/event.go new file mode 100644 index 0000000..0b4f747 --- /dev/null +++ b/internal/storage/gormrepo/event.go @@ -0,0 +1,33 @@ +package gormrepo + +import ( + "context" + + "github.com/vladyslavpavlenko/genesis-api-project/internal/outbox" +) + +// AddEvent creates a new outbox.Event record. +func (c *Connection) AddEvent(event *outbox.Event) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + result := c.db.WithContext(ctx).Create(event) + if result.Error != nil { + return result.Error + } + return nil +} + +// FetchUnpublishedEvents retrieves all events from the database that have not been +// published after the specified offset. +func (c *Connection) FetchUnpublishedEvents(lastOffset uint) ([]outbox.Event, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + var events []outbox.Event + err := c.db.WithContext(ctx).Where("id > ?", lastOffset).Find(&events).Error + if err != nil { + return nil, err + } + return events, nil +} diff --git a/internal/storage/gormrepo/offset.go b/internal/storage/gormrepo/offset.go new file mode 100644 index 0000000..dbcc22a --- /dev/null +++ b/internal/storage/gormrepo/offset.go @@ -0,0 +1,29 @@ +package gormrepo + +import ( + "context" + + "github.com/vladyslavpavlenko/genesis-api-project/internal/outbox/producer" +) + +// GetLastOffset retrieves the last offset for a given topic from the database. +func (c *Connection) GetLastOffset(topic string, partition int) (producer.Offset, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + var lastOffset producer.Offset + err := c.db.WithContext(ctx).Where("topic = ? AND partition = ?", topic, partition).First(&lastOffset).Error + if err != nil { + return producer.Offset{}, err + } + return lastOffset, nil +} + +// UpdateOffset updates the offset in the database to reflect the latest published +// event's ID. +func (c *Connection) UpdateOffset(offset *producer.Offset) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + return c.db.WithContext(ctx).Save(offset).Error +} diff --git a/internal/storage/gormrepo/subscription.go b/internal/storage/gormrepo/subscription.go new file mode 100644 index 0000000..912aa39 --- /dev/null +++ b/internal/storage/gormrepo/subscription.go @@ -0,0 +1,69 @@ +package gormrepo + +import ( + "context" + "errors" + "time" + + "github.com/vladyslavpavlenko/genesis-api-project/internal/models" + + "github.com/jackc/pgx/v5/pgconn" +) + +var ( + ErrorDuplicateSubscription = errors.New("subscription already exists") + ErrorNonExistentSubscription = errors.New("subscription does not exist") +) + +// AddSubscription creates a new models.Subscription record. +func (c *Connection) AddSubscription(email string) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + subscription := models.Subscription{ + Email: email, + CreatedAt: time.Now(), + } + result := c.db.WithContext(ctx).Create(&subscription) + if result.Error != nil { + var pgErr *pgconn.PgError + if errors.As(result.Error, &pgErr) && pgErr.Code == "23505" { + return ErrorDuplicateSubscription + } + return result.Error + } + return nil +} + +// DeleteSubscription deletes a models.Subscription record. +func (c *Connection) DeleteSubscription(email string) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + result := c.db.WithContext(ctx).Where("email = ?", email).Delete(&models.Subscription{}) + if result.Error != nil { + return result.Error + } + + if result.RowsAffected == 0 { + return ErrorNonExistentSubscription + } + + return nil +} + +// GetSubscriptions returns a paginated list of subscriptions. Limit specifies the number of records to be retrieved +// Limit conditions can be canceled by using `Limit(-1)`. Offset specify the number of records to skip before starting +// to return the records. Offset conditions can be canceled by using `Offset(-1)`. +func (c *Connection) GetSubscriptions(limit, offset int) ([]models.Subscription, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + var subscriptions []models.Subscription + result := c.db.WithContext(ctx).Limit(limit).Offset(offset).Find(&subscriptions) + if result.Error != nil { + return nil, result.Error + } + + return subscriptions, nil +} diff --git a/internal/scheduler/cron_scheduler.go b/pkg/scheduler/cron_scheduler.go similarity index 78% rename from internal/scheduler/cron_scheduler.go rename to pkg/scheduler/cron_scheduler.go index 775d06e..3bb8e7c 100644 --- a/internal/scheduler/cron_scheduler.go +++ b/pkg/scheduler/cron_scheduler.go @@ -14,8 +14,8 @@ func NewCronScheduler() *CronScheduler { } } -// ScheduleTask schedules a given task to run at the specified cron schedule. -func (s *CronScheduler) ScheduleTask(schedule string, task func()) (cron.EntryID, error) { +// Schedule schedules a given task to run at the specified cron schedule. +func (s *CronScheduler) Schedule(schedule string, task func()) (cron.EntryID, error) { id, err := s.Cron.AddFunc(schedule, task) if err != nil { return 0, err diff --git a/internal/scheduler/cron_scheduler_test.go b/pkg/scheduler/cron_scheduler_test.go similarity index 78% rename from internal/scheduler/cron_scheduler_test.go rename to pkg/scheduler/cron_scheduler_test.go index 154943c..b46bf84 100644 --- a/internal/scheduler/cron_scheduler_test.go +++ b/pkg/scheduler/cron_scheduler_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "github.com/vladyslavpavlenko/genesis-api-project/internal/scheduler" + "github.com/vladyslavpavlenko/genesis-api-project/pkg/scheduler" ) func TestNewCronScheduler(t *testing.T) { @@ -16,12 +16,12 @@ func TestNewCronScheduler(t *testing.T) { func TestScheduleTask(t *testing.T) { s := scheduler.NewCronScheduler() - _, err := s.ScheduleTask("* * * * *", func() { t.Log("Task executed") }) + _, err := s.Schedule("* * * * *", func() { t.Log("Task executed") }) if err != nil { t.Errorf("Failed to schedule task with valid cron schedule: %v", err) } - _, err = s.ScheduleTask("invalid schedule", func() {}) + _, err = s.Schedule("invalid schedule", func() {}) if err == nil { t.Error("Expected error when scheduling task with invalid cron schedule, got none") } @@ -32,7 +32,7 @@ func TestStart(t *testing.T) { done := make(chan bool) wasRun := false - _, err := s.ScheduleTask("@every 1s", func() { + _, err := s.Schedule("@every 1s", func() { wasRun = true done <- true })