Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature/8-kafka-queues-connect] Add multiCreate and partially tracing #9

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@
# Dependency directories (remove the comment below to include it)
/vendor
/.idea
/bin
/bin
/third_party/db/postgres_data/
/third_party/kafka/data/
/third_party/zookeeper/
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ COPY --from=build-stage ${SRC_PATH}/bin/${RUNNING_BIN} /usr/local/bin/${RUNNING_
COPY --from=build-stage ${SRC_PATH}/migrations ${SRC_PATH}/migrations
WORKDIR ${SRC_PATH}

EXPOSE 1337

CMD ["ocp-chat-api"]
10 changes: 6 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ ifeq ($(TAG),)
TAG=$(subst /,-,$(shell git rev-parse --abbrev-ref HEAD))
endif

build:
deps:
go mod tidy
go mod vendor

build:
go build -o bin/ocp-chat-api ./cmd/ocp-chat-api

run: build
Expand Down Expand Up @@ -42,8 +44,8 @@ docker-compose-up: generate-mocks grpc-proto docker-build
TAG=${TAG} docker-compose up --remove-orphans

client:
go get github.com/fullstorydev/grpcui/...
go install github.com/fullstorydev/grpcui/cmd/grpcui@latest
# go get github.com/fullstorydev/grpcui/...
# go install github.com/fullstorydev/grpcui/cmd/grpcui@latest
grpcui -plaintext -proto pkg/chat_api/ocp-chat-api.proto 127.0.0.1:5300

all: generate-mocks grpc-proto lint test docker-build docker-run
all: grpc-proto deps build generate-mocks lint test docker-build
7 changes: 7 additions & 0 deletions cmd/ocp-chat-api/conf.env
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,10 @@ LOG_LEVEL=debug

HTTP_ADDR=:8888
GRPC_ADDR=:5300

KAFKA_HOST=kafka-1.
KAFKA_PORT=9092
KAFKA_TOPIC=chats

STORAGE_FLUSHER_PERIOD=1s
STORAGE_FLUSHER_CAPACITY=3
40 changes: 37 additions & 3 deletions cmd/ocp-chat-api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ const (
defaultSQLMaxAllowedPacket = 4096
)

const (
defaultKafkaTopic = "chats"
defaultKafkaHost = "kafka-1"
defaultKafkaPort = "9092"
)

const (
defaultStorageFlusherPeriod = 10 * time.Second
defaultStorageFlusherCapacity = 1000
)

const (
defaultSQLHost = "postgres"
defaultSQLPort = 5432
Expand Down Expand Up @@ -43,10 +54,26 @@ type DatabaseConfig struct {
MigrationDBVersion uint `envconfig:"MIGRATION_DB_VERSION"`
}

type KafkaConfig struct {
Host string `envconfig:"KAFKA_HOST" required:"true"`
Port string `envconfig:"KAFKA_PORT" `
Topic string `envconfig:"KAFKA_TOPIC" required:"true"`

// MigrationsURL is directory containing migration scripts.
MigrationsURL string `envconfig:"MIGRATION_FILES_LOCATION"`
// MigrationRun is a flag: if true, we should run migration to particular version.
MigrationRun bool `envconfig:"MIGRATION_RUN"`
// MigrationDBVersion is version of DB that we should migrate to.
MigrationDBVersion uint `envconfig:"MIGRATION_DB_VERSION"`
}

type Config struct {
HTTPAddr string `envconfig:"HTTP_ADDR"`
GRPCAddr string `envconfig:"GRPC_ADDR"`
DatabaseCfg DatabaseConfig
HTTPAddr string `envconfig:"HTTP_ADDR"`
GRPCAddr string `envconfig:"GRPC_ADDR"`
DatabaseCfg DatabaseConfig
KafkaCfg KafkaConfig
StorageFlusherPeriod time.Duration `envconfig:"STORAGE_FLUSHER_PERIOD"`
StorageFlusherCapacity int64 `envconfig:"STORAGE_FLUSHER_CAPACITY"`
}

func NewDefaultConfig() *Config {
Expand All @@ -68,5 +95,12 @@ func NewDefaultConfig() *Config {
MigrationRun: defaultMigrationRun,
MigrationDBVersion: defaultMigrationDBVersion,
},
KafkaCfg: KafkaConfig{
Host: defaultKafkaHost,
Port: defaultKafkaPort,
Topic: defaultKafkaTopic,
},
StorageFlusherPeriod: defaultStorageFlusherPeriod,
StorageFlusherCapacity: defaultStorageFlusherCapacity,
}
}
94 changes: 75 additions & 19 deletions cmd/ocp-chat-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package main

import (
"context"
"github.com/ozoncp/ocp-chat-api/internal/db"
"github.com/ozoncp/ocp-chat-api/internal/saver"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/Shopify/sarama"
"github.com/ozoncp/ocp-chat-api/internal/chat_queue"
"github.com/ozoncp/ocp-chat-api/internal/db"
"github.com/ozoncp/ocp-chat-api/internal/saver"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/rs/zerolog"

Expand Down Expand Up @@ -39,12 +43,13 @@ func Run() error {

defaultLogger.Info().Msgf("started service %v", os.Args[0])

chat_repo.InitMetrics()
cfg := NewDefaultConfig()
if err := envconfig.Process("", cfg); err != nil {
return errors.Wrap(err, "read config from env")
}

// persistent storage interaction
// persistent storage for chats in Postgres
ctx := context.Background()
ctx = defaultLogger.WithContext(ctx)

Expand Down Expand Up @@ -72,35 +77,46 @@ func Run() error {

chatStorage := chat_repo.NewPostgresRepo(sqlDB)

// our i/o channel with chat objects
chatQueue := chat_repo.NewRepoInMemory() // future Kafka
// our queue Kafka
kafkaAddr := cfg.KafkaCfg.Host + ":" + cfg.KafkaCfg.Port
//brokers := []string{kafkaAddr}
//producer, err := newProducer(brokers)
//if err != nil {
// return errors.Wrap(err, "create kafka producer")
//}

consumer, err := sarama.NewConsumer([]string{kafkaAddr}, nil)
if err != nil {
return errors.Wrap(err, "new consumer")
}
// fixme maybe defer close()

// statistics module
statisticsRepo := chat_repo.NewRepoInMemory()
chatQueue := chat_queue.NewKafkaConsumer(consumer, 4, cfg.KafkaCfg.Topic)

statisticsFlusherDeps := chat_flusher.Deps{
ChunkSize: 1,
storageRepoFlusherDeps := chat_flusher.Deps{
ChunkSize: 2,
}

statisticsFlusher := chat_flusher.NewChatFlusher(statisticsFlusherDeps)
storageRepoFlusher := chat_flusher.NewChatFlusher(storageRepoFlusherDeps)

statisticSaverDeps := &saver.Deps{
storageSaverDeps := &saver.Deps{
Capacity: 1000,
FlusherHere: statisticsFlusher,
Repository: statisticsRepo,
FlushPeriod: 10 * time.Second,
FlusherHere: storageRepoFlusher,
Repository: chatStorage,
FlushPeriod: cfg.StorageFlusherPeriod,
Strategy: saver.RemoveOldest,
}
statisticsSaver := saver.New(statisticSaverDeps)
storageSaver := saver.New(storageSaverDeps)

serviceDeps := &chat_service.Deps{
StorageRepo: chatStorage,
QueueRepo: chatQueue,
StatisticsSaver: statisticsSaver,
StorageRepo: chatStorage,
QueueConsumer: chatQueue,
StorageRepoSaver: storageSaver,
}

chatService := chat_service.New(serviceDeps)
chatAPI := chat_api.New(chatService)

// api
listener, err := net.Listen(defaultTransportProtocol, cfg.GRPCAddr)
if err != nil {
Expand All @@ -118,6 +134,14 @@ func Run() error {
return WaitInterruptFromOS(ctx)
})

runner.Go(func() error {
http.Handle("/metrics", promhttp.Handler())
if err := http.ListenAndServe(":1337", nil); err != nil {
return errors.Wrap(err, "metrics server")
}
return nil
})

runner.Go(func() error {
logger.Info().Msg("Start serving grpc api")
if err := grpcServer.Serve(listener); err != nil {
Expand All @@ -126,6 +150,20 @@ func Run() error {
return nil
})

runner.Go(func() error {
if err = chatQueue.Run(ctx); err != nil {
return errors.Wrap(err, "kafka chat queue run")
}
return nil
})

runner.Go(func() error {
if err = storageSaver.Run(ctx); err != nil {
return errors.Wrap(err, "storage saver run")
}
return nil
})

runner.Go(func() error {
<-ctx.Done()
logger.Info().Msg("context done; launching graceful stop of grpc server")
Expand Down Expand Up @@ -176,3 +214,21 @@ func DefaultOSSignals() []os.Signal {
func InterruptedFromOS(err error) bool {
return errors.Is(err, ErrOSSignalInterrupt)
}

func newProducer(brokers []string) (sarama.SyncProducer, error) {
config := sarama.NewConfig()
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(brokers, config)
return producer, err
}

func prepareMessage(topic, mess string) *sarama.ProducerMessage {
msg := &sarama.ProducerMessage{
Topic: topic,
Partition: -1,
Value: sarama.StringEncoder(mess),
}
return msg
}
37 changes: 36 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ services:
ports:
- 8888:80
- 5300:5300
- 1337:1337

depends_on:
- db
# todo return them!
# - zoo
# - kafka-1

networks:
- backend
Expand All @@ -26,9 +30,40 @@ services:
POSTGRES_PASSWORD: example
POSTGRES_DB: chat_db
volumes:
- /Users/vvakhlyn/go/src/github.com/letsdoitnow7777/ocp-chat-api/third_party/db/postgres_data:/var/lib/postgresql
- ${PWD}/third_party/db/postgres_data:/var/lib/postgresql
ports:
- 5432:5432

networks:
- backend

zoo:
image: zookeeper:3.5.6
hostname: test-zoo
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=test-zoo:2888:3888;2181
networks:
- backend

kafka-1:
image: confluentinc/cp-kafka:6.2.0
ports:
- "19094:19094"
environment:
KAFKA_LISTENERS: INTERNAL://kafka-1:9092,OUTSIDE://kafka-1:9094,FOR_HOST://kafka-1:19094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:9092,OUTSIDE://kafka-1:9094,FOR_HOST://localhost:19094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT,FOR_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zoo
networks:
- backend

Expand Down
Loading