diff --git a/argocd/helm-udaconnect-prod.yaml b/argocd/helm-udaconnect-prod.yaml new file mode 100644 index 000000000..6ef1410d7 --- /dev/null +++ b/argocd/helm-udaconnect-prod.yaml @@ -0,0 +1,17 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Application +metadata: + name: techtrends-prod + namespace: argocd +spec: + destination: + namespace: default + server: https://kubernetes.default.svc + project: default + source: + helm: + valueFiles: + - values-prod.yaml + path: helm + repoURL: https://github.com/kydq2022/nd064-c2-message-passing-projects-starter + targetRevision: HEAD \ No newline at end of file diff --git a/db/2020-08-15_init-db.sql b/db/2020-08-15_init-db.sql index bd0e6abb4..1a869533f 100644 --- a/db/2020-08-15_init-db.sql +++ b/db/2020-08-15_init-db.sql @@ -10,8 +10,7 @@ CREATE TABLE location ( id SERIAL PRIMARY KEY, person_id INT NOT NULL, coordinate GEOMETRY NOT NULL, - creation_time TIMESTAMP NOT NULL DEFAULT NOW(), - FOREIGN KEY (person_id) REFERENCES person(id) + creation_time TIMESTAMP NOT NULL DEFAULT NOW() ); CREATE INDEX coordinate_idx ON location (coordinate); CREATE INDEX creation_time_idx ON location (creation_time); diff --git a/deployment/kafka-configmap.yaml b/deployment/kafka-configmap.yaml new file mode 100644 index 000000000..74192aa4a --- /dev/null +++ b/deployment/kafka-configmap.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: kafka-env +data: + KAFKA_BOOTSTRAP_SERVERS: "kafka-service:9092" + KAFKA_TOPIC_PERSON: "person-topic" + KAFKA_TOPIC_LOCATION: "location-topic" \ No newline at end of file diff --git a/deployment/kafka.yaml b/deployment/kafka.yaml new file mode 100644 index 000000000..3e6438f06 --- /dev/null +++ b/deployment/kafka.yaml @@ -0,0 +1,35 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kafka-kraft +spec: + replicas: 1 + selector: + matchLabels: + app: kafka-kraft + template: + metadata: + labels: + app: kafka-kraft + spec: + containers: + - name: kafka-kraft + image: confluentinc/confluent-local:7.4.0 + ports: + - containerPort: 9092 + env: + - name: KAFKA_ADVERTISED_LISTENERS + value: PLAINTEXT://kafka-kraft:29092,PLAINTEXT_HOST://kafka-service:9092 + +--- +apiVersion: v1 +kind: Service +metadata: + name: kafka-service +spec: + selector: + app: kafka-kraft + ports: + - protocol: TCP + port: 9092 # Port to expose on the service + targetPort: 9092 # Port your application is listening on diff --git a/deployment/person-rpc.yaml b/deployment/person-rpc.yaml new file mode 100644 index 000000000..f3e2e1a2d --- /dev/null +++ b/deployment/person-rpc.yaml @@ -0,0 +1,62 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + service: person-rpc + name: person-rpc +spec: + ports: + - name: "50051" + port: 50051 + targetPort: 50051 + selector: + service: person-rpc + type: NodePort +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + service: person-rpc + name: person-rpc +spec: + replicas: 1 + selector: + matchLabels: + service: person-rpc + template: + metadata: + labels: + service: person-rpc + spec: + containers: + - image: kydq2022/nd064-c2-person-rpc:latest + name: person-rpc + imagePullPolicy: Always + env: + - name: DB_USERNAME + valueFrom: + configMapKeyRef: + name: db-env + key: DB_USERNAME + - name: DB_PASSWORD + valueFrom: + secretKeyRef: + name: db-secret + key: DB_PASSWORD + - name: DB_NAME + valueFrom: + configMapKeyRef: + name: db-env + key: DB_NAME + - name: DB_HOST + valueFrom: + configMapKeyRef: + name: db-env + key: DB_HOST + - name: DB_PORT + valueFrom: + configMapKeyRef: + name: db-env + key: DB_PORT + restartPolicy: Always diff --git a/deployment/postgres.yaml b/deployment/postgres.yaml index debd18781..6dfa3bda3 100644 --- a/deployment/postgres.yaml +++ b/deployment/postgres.yaml @@ -58,7 +58,8 @@ spec: containers: - name: postgres image: postgis/postgis:12-2.5-alpine - imagePullPolicy: "IfNotPresent" + imagePullPolicy: Always + restartPolicy: Always ports: - containerPort: 5432 env: diff --git a/deployment/udaconnect-api.yaml b/deployment/udaconnect-api.yaml index e62dcbd0e..fd85cdd73 100644 --- a/deployment/udaconnect-api.yaml +++ b/deployment/udaconnect-api.yaml @@ -9,7 +9,6 @@ spec: - name: "5000" port: 5000 targetPort: 5000 - nodePort: 30001 selector: service: udaconnect-api type: NodePort diff --git a/deployment/udaconnect-app.yaml b/deployment/udaconnect-app.yaml index 26a0ca5dc..6ddb71c44 100644 --- a/deployment/udaconnect-app.yaml +++ b/deployment/udaconnect-app.yaml @@ -9,7 +9,6 @@ spec: - name: "3000" port: 3000 targetPort: 3000 - nodePort: 30000 selector: service: udaconnect-app type: NodePort @@ -31,7 +30,7 @@ spec: service: udaconnect-app spec: containers: - - image: udacity/nd064-udaconnect-app:latest + - image: kydq2022/nd064-c2-udaconnect-app:latest name: udaconnect-app imagePullPolicy: Always resources: @@ -41,4 +40,9 @@ spec: limits: memory: "256Mi" cpu: "256m" + env: + - name: PERSON_API + value: "person-api:5000" + - name: CONNECTION_API + value: "person-api:5000" restartPolicy: Always diff --git a/deployment/udaconnect-connection.yaml b/deployment/udaconnect-connection.yaml new file mode 100644 index 000000000..30043ec91 --- /dev/null +++ b/deployment/udaconnect-connection.yaml @@ -0,0 +1,65 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + service: connection-api + name: connection-api +spec: + ports: + - name: "5000" + port: 5000 + targetPort: 5000 + selector: + service: connection-api + type: NodePort +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + service: connection-api + name: connection-api +spec: + replicas: 1 + selector: + matchLabels: + service: connection-api + template: + metadata: + labels: + service: connection-api + spec: + containers: + - image: kydq2022/nd064-c2-connection-api:latest + name: connection-api + imagePullPolicy: Always + env: + - name: DB_USERNAME + valueFrom: + configMapKeyRef: + name: db-env + key: DB_USERNAME + - name: DB_PASSWORD + valueFrom: + secretKeyRef: + name: db-secret + key: DB_PASSWORD + - name: DB_NAME + valueFrom: + configMapKeyRef: + name: db-env + key: DB_NAME + - name: DB_HOST + valueFrom: + configMapKeyRef: + name: db-env + key: DB_HOST + - name: DB_PORT + valueFrom: + configMapKeyRef: + name: db-env + key: DB_PORT + # gRPC + - name: GRPC_SERVER_ADDRESS + value: "person-rpc:50051" + restartPolicy: Always diff --git a/deployment/udaconnect-location-consumer.yaml b/deployment/udaconnect-location-consumer.yaml new file mode 100644 index 000000000..8fd182edc --- /dev/null +++ b/deployment/udaconnect-location-consumer.yaml @@ -0,0 +1,58 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + service: location-consumer + name: location-consumer +spec: + replicas: 1 + selector: + matchLabels: + service: location-consumer + template: + metadata: + labels: + service: location-consumer + spec: + containers: + - image: kydq2022/nd064-c2-location-consumer:latest + name: location-consumer + imagePullPolicy: Always + env: + - name: DB_USERNAME + valueFrom: + configMapKeyRef: + name: db-env + key: DB_USERNAME + - name: DB_PASSWORD + valueFrom: + secretKeyRef: + name: db-secret + key: DB_PASSWORD + - name: DB_NAME + valueFrom: + configMapKeyRef: + name: db-env + key: DB_NAME + - name: DB_HOST + valueFrom: + configMapKeyRef: + name: db-env + key: DB_HOST + - name: DB_PORT + valueFrom: + configMapKeyRef: + name: db-env + key: DB_PORT + # Kafka + - name: KAFKA_BOOTSTRAP_SERVERS + valueFrom: + configMapKeyRef: + name: kafka-env + key: KAFKA_BOOTSTRAP_SERVERS + - name: KAFKA_TOPIC + valueFrom: + configMapKeyRef: + name: kafka-env + key: KAFKA_TOPIC_LOCATION + restartPolicy: Always diff --git a/deployment/udaconnect-location.yaml b/deployment/udaconnect-location.yaml new file mode 100644 index 000000000..66151f962 --- /dev/null +++ b/deployment/udaconnect-location.yaml @@ -0,0 +1,73 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + service: location-api + name: location-api +spec: + ports: + - name: "5000" + port: 5000 + targetPort: 5000 + selector: + service: location-api + type: NodePort +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + service: location-api + name: location-api +spec: + replicas: 1 + selector: + matchLabels: + service: location-api + template: + metadata: + labels: + service: location-api + spec: + containers: + - image: kydq2022/nd064-c2-location-api:latest + name: location-api + imagePullPolicy: Always + env: + - name: DB_USERNAME + valueFrom: + configMapKeyRef: + name: db-env + key: DB_USERNAME + - name: DB_PASSWORD + valueFrom: + secretKeyRef: + name: db-secret + key: DB_PASSWORD + - name: DB_NAME + valueFrom: + configMapKeyRef: + name: db-env + key: DB_NAME + - name: DB_HOST + valueFrom: + configMapKeyRef: + name: db-env + key: DB_HOST + - name: DB_PORT + valueFrom: + configMapKeyRef: + name: db-env + key: DB_PORT + # Kafka + - name: KAFKA_BOOTSTRAP_SERVERS + valueFrom: + configMapKeyRef: + name: kafka-env + key: KAFKA_BOOTSTRAP_SERVERS + - name: KAFKA_TOPIC + valueFrom: + configMapKeyRef: + name: kafka-env + key: KAFKA_TOPIC_LOCATION + restartPolicy: Always diff --git a/deployment/udaconnect-person-consumer.yaml b/deployment/udaconnect-person-consumer.yaml new file mode 100644 index 000000000..9bcb58978 --- /dev/null +++ b/deployment/udaconnect-person-consumer.yaml @@ -0,0 +1,58 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + service: person-consumer + name: person-consumer +spec: + replicas: 1 + selector: + matchLabels: + service: person-consumer + template: + metadata: + labels: + service: person-consumer + spec: + containers: + - image: kydq2022/nd064-c2-person-consumer:latest + name: person-consumer + imagePullPolicy: Always + env: + - name: DB_USERNAME + valueFrom: + configMapKeyRef: + name: db-env + key: DB_USERNAME + - name: DB_PASSWORD + valueFrom: + secretKeyRef: + name: db-secret + key: DB_PASSWORD + - name: DB_NAME + valueFrom: + configMapKeyRef: + name: db-env + key: DB_NAME + - name: DB_HOST + valueFrom: + configMapKeyRef: + name: db-env + key: DB_HOST + - name: DB_PORT + valueFrom: + configMapKeyRef: + name: db-env + key: DB_PORT + # Kafka + - name: KAFKA_BOOTSTRAP_SERVERS + valueFrom: + configMapKeyRef: + name: kafka-env + key: KAFKA_BOOTSTRAP_SERVERS + - name: KAFKA_TOPIC + valueFrom: + configMapKeyRef: + name: kafka-env + key: KAFKA_TOPIC_PERSON + restartPolicy: Always diff --git a/deployment/udaconnect-person.yaml b/deployment/udaconnect-person.yaml new file mode 100644 index 000000000..dc24333dc --- /dev/null +++ b/deployment/udaconnect-person.yaml @@ -0,0 +1,73 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + service: person-api + name: person-api +spec: + ports: + - name: "5000" + port: 5000 + targetPort: 5000 + selector: + service: person-api + type: NodePort +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + service: person-api + name: person-api +spec: + replicas: 1 + selector: + matchLabels: + service: person-api + template: + metadata: + labels: + service: person-api + spec: + containers: + - image: kydq2022/nd064-c2-person-api:latest + name: person-api + imagePullPolicy: Always + env: + - name: DB_USERNAME + valueFrom: + configMapKeyRef: + name: db-env + key: DB_USERNAME + - name: DB_PASSWORD + valueFrom: + secretKeyRef: + name: db-secret + key: DB_PASSWORD + - name: DB_NAME + valueFrom: + configMapKeyRef: + name: db-env + key: DB_NAME + - name: DB_HOST + valueFrom: + configMapKeyRef: + name: db-env + key: DB_HOST + - name: DB_PORT + valueFrom: + configMapKeyRef: + name: db-env + key: DB_PORT + # Kafka + - name: KAFKA_BOOTSTRAP_SERVERS + valueFrom: + configMapKeyRef: + name: kafka-env + key: KAFKA_BOOTSTRAP_SERVERS + - name: KAFKA_TOPIC + valueFrom: + configMapKeyRef: + name: kafka-env + key: KAFKA_TOPIC_PERSON + restartPolicy: Always diff --git a/docker-cmd.md b/docker-cmd.md new file mode 100644 index 000000000..b8a705eec --- /dev/null +++ b/docker-cmd.md @@ -0,0 +1,41 @@ +```shell + +# connection +sudo docker build -t kydq2022/nd064-c2-connection-api:latest modules/connection +sudo docker push kydq2022/nd064-c2-connection-api:latest + +# location +sudo docker build -t kydq2022/nd064-c2-location-api:latest modules/location +sudo docker push kydq2022/nd064-c2-location-api:latest + +sudo docker build -t kydq2022/nd064-c2-location-consumer:latest modules/location_consumer +sudo docker push kydq2022/nd064-c2-location-consumer:latest + + +# person +sudo docker build -t kydq2022/nd064-c2-person-api:latest modules/person +sudo docker push kydq2022/nd064-c2-person-api:latest + +sudo docker build -t kydq2022/nd064-c2-person-consumer:latest modules/person_consumer +sudo docker push kydq2022/nd064-c2-person-consumer:latest + +sudo docker build -t kydq2022/nd064-c2-person-rpc:latest modules/person_rpc +sudo docker push kydq2022/nd064-c2-person-rpc:latest + +sudo docker build -t kydq2022/nd064-c2-udaconnect-app:latest modules/frontend +sudo docker push kydq2022/nd064-c2-udaconnect-app:latest + +``` + +``` +sudo docker run -p 5000:5001 \ + -e DB_USERNAME="ct_admin" \ + -e DB_NAME="geoconnections" \ + -e DB_PASSWORD="wowimsosecure" \ + -e DB_HOST="localhost" \ + -e DB_PORT="5432" \ + -e GRPC_SERVER_ADDRESS="localhost:50051" \ + -e KAFKA_BOOTSTRAP_SERVERS="localhosxt:9092" \ + kydq2022/nd064-c2-location-api:latest + +``` \ No newline at end of file diff --git a/docs/architecture_decisions.md b/docs/architecture_decisions.md new file mode 100644 index 000000000..6dd0a0944 --- /dev/null +++ b/docs/architecture_decisions.md @@ -0,0 +1,38 @@ +# Architecture Decisions Document + +## Date: [Date] + +## Context + +Our system involves the development of a distributed application with multiple services, each serving a specific purpose. The primary components include a frontend (fe) that serves as a REST API, a `connection` service that retrieves data from a `person-rpc` service using gRPC, and a mechanism for creating `person` and `location` entities using a Kafka topic. + +![Alt text](system-architecture.png) + +## Decision 1: Frontend as REST API + +### Considerations + +- REST APIs are widely adopted and well-suited for exposing services over HTTP. +- Simplicity and ease of integration with various clients. + +### Decision + +The frontend (`fe`) will be designed as a REST API. This decision is based on the familiarity of RESTful principles and the ease with which clients can interact with the system using standard HTTP methods. + +## Decision 2: gRPC for `connection` to `person-rpc` + +### Considerations + +- `connection` needs to retrieve data from `person-rpc`. +- gRPC provides a high-performance, language-agnostic RPC framework. + +### Decision + +`connection` will communicate with `person-rpc` using gRPC. This decision is motivated by the efficiency and ease of use offered by gRPC, enabling strongly-typed communication and efficient serialization. + +## Decision 3: Kafka Topic for Creating `person` and `location` + +### Considerations + +- Asynchronous communication for creating entities. +- Kafka provides a distributed, fault-tolerant, and scalable message broker. diff --git a/docs/grpc.md b/docs/grpc.md new file mode 100644 index 000000000..bd98b9672 --- /dev/null +++ b/docs/grpc.md @@ -0,0 +1,29 @@ +# ENV +```shell +#!/bin/bash + +export DB_USERNAME="ct_admin" +export DB_NAME="geoconnections" +export DB_PASSWORD="wowimsosecure" +export DB_HOST="localhost" +export DB_PORT="5432" + +# gRPC +export GRPC_SERVER_ADDRESS="localhost:50051" + +# Kafka +export GRPC_SERVER_ADDRESS="localhost:9092" +``` + +## Start RPC server +```shell + +python app/server.py + +``` +## Run test +```shell + +python app/client.py + +``` diff --git a/docs/openapi.yaml b/docs/openapi.yaml new file mode 100644 index 000000000..14f3c033a --- /dev/null +++ b/docs/openapi.yaml @@ -0,0 +1,96 @@ +openapi: 3.1.0 +info: + title: "UdaConnect API" + version: "0.1.0" +paths: + /api/persons/{person_id}/connection: + get: + tags: + - UdaConnect + operationId: get_connection_data_resource + parameters: + - name: person_id + in: path + required: true + schema: + type: string + - name: distance + in: query + description: Proximity to a given user in meters + schema: + type: string + - name: end_date + in: query + description: Upper bound of date range + schema: + type: string + - name: start_date + in: query + description: Lower bound of date range + schema: + type: string + responses: + '200': + description: Success + + /api/persons: + post: + tags: + - UdaConnect + operationId: create_person + requestBody: + description: Person data to be created + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/Person' + responses: + '201': + description: Created + '400': + description: Bad Request + + /api/locations: + post: + tags: + - UdaConnect + operationId: create_location + requestBody: + description: Location data to be created + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/Location' + responses: + '201': + description: Created + '400': + description: Bad Request + +components: + schemas: + Person: + type: object + properties: + first_name: + type: string + last_name: + type: string + company_name: + type: string + + Location: + type: object + properties: + latitude: + type: number + longitude: + type: number + + responses: + ParseError: + description: When a mask can't be parsed + MaskError: + description: When any error occurs on mask diff --git a/docs/pods_screenshot.png b/docs/pods_screenshot.png new file mode 100644 index 000000000..36e48e208 Binary files /dev/null and b/docs/pods_screenshot.png differ diff --git a/docs/postman.json b/docs/postman.json new file mode 100644 index 000000000..6648033c6 --- /dev/null +++ b/docs/postman.json @@ -0,0 +1,39 @@ +{ + "info": { + "_postman_id": "unique_id", + "name": "Your Collection Name", + "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json" + }, + "item": [ + { + "name": "Create Location", + "request": { + "method": "POST", + "header": [ + { + "key": "Content-Type", + "value": "application/json" + } + ], + "body": { + "mode": "raw", + "raw": "{\n \"latitude\": \"-122.290524\",\n \"longitude\": \"37.553441\",\n \"creation_time\": \"2020-08-18T10:37:06\",\n \"person_id\": 29\n}" + }, + "url": { + "raw": "http://localhost:5000/api/locations", + "protocol": "http", + "host": [ + "localhost" + ], + "port": "5000", + "path": [ + "api", + "locations" + ] + } + }, + "response": [] + } + ] + } + \ No newline at end of file diff --git a/docs/services_screenshot.png b/docs/services_screenshot.png new file mode 100644 index 000000000..0be7b5d38 Binary files /dev/null and b/docs/services_screenshot.png differ diff --git a/docs/system-architecture.png b/docs/system-architecture.png new file mode 100644 index 000000000..24bbfb513 Binary files /dev/null and b/docs/system-architecture.png differ diff --git a/env-dev b/env-dev new file mode 100755 index 000000000..d0dc4a1d2 --- /dev/null +++ b/env-dev @@ -0,0 +1,13 @@ +#!/bin/bash + +export DB_USERNAME="ct_admin" +export DB_NAME="geoconnections" +export DB_PASSWORD="wowimsosecure" +export DB_HOST="localhost" +export DB_PORT="5432" + +# gRPC +export GRPC_SERVER_ADDRESS="localhost:50051" + +# Kafka +export GRPC_SERVER_ADDRESS="localhost:9092" diff --git a/modules/api/app/udaconnect/controllers.py b/modules/api/app/udaconnect/controllers.py deleted file mode 100644 index 0b714680b..000000000 --- a/modules/api/app/udaconnect/controllers.py +++ /dev/null @@ -1,83 +0,0 @@ -from datetime import datetime - -from app.udaconnect.models import Connection, Location, Person -from app.udaconnect.schemas import ( - ConnectionSchema, - LocationSchema, - PersonSchema, -) -from app.udaconnect.services import ConnectionService, LocationService, PersonService -from flask import request -from flask_accepts import accepts, responds -from flask_restx import Namespace, Resource -from typing import Optional, List - -DATE_FORMAT = "%Y-%m-%d" - -api = Namespace("UdaConnect", description="Connections via geolocation.") # noqa - - -# TODO: This needs better exception handling - - -@api.route("/locations") -@api.route("/locations/") -@api.param("location_id", "Unique ID for a given Location", _in="query") -class LocationResource(Resource): - @accepts(schema=LocationSchema) - @responds(schema=LocationSchema) - def post(self) -> Location: - request.get_json() - location: Location = LocationService.create(request.get_json()) - return location - - @responds(schema=LocationSchema) - def get(self, location_id) -> Location: - location: Location = LocationService.retrieve(location_id) - return location - - -@api.route("/persons") -class PersonsResource(Resource): - @accepts(schema=PersonSchema) - @responds(schema=PersonSchema) - def post(self) -> Person: - payload = request.get_json() - new_person: Person = PersonService.create(payload) - return new_person - - @responds(schema=PersonSchema, many=True) - def get(self) -> List[Person]: - persons: List[Person] = PersonService.retrieve_all() - return persons - - -@api.route("/persons/") -@api.param("person_id", "Unique ID for a given Person", _in="query") -class PersonResource(Resource): - @responds(schema=PersonSchema) - def get(self, person_id) -> Person: - person: Person = PersonService.retrieve(person_id) - return person - - -@api.route("/persons//connection") -@api.param("start_date", "Lower bound of date range", _in="query") -@api.param("end_date", "Upper bound of date range", _in="query") -@api.param("distance", "Proximity to a given user in meters", _in="query") -class ConnectionDataResource(Resource): - @responds(schema=ConnectionSchema, many=True) - def get(self, person_id) -> ConnectionSchema: - start_date: datetime = datetime.strptime( - request.args["start_date"], DATE_FORMAT - ) - end_date: datetime = datetime.strptime(request.args["end_date"], DATE_FORMAT) - distance: Optional[int] = request.args.get("distance", 5) - - results = ConnectionService.find_contacts( - person_id=person_id, - start_date=start_date, - end_date=end_date, - meters=distance, - ) - return results diff --git a/modules/api/Dockerfile b/modules/connection/Dockerfile similarity index 100% rename from modules/api/Dockerfile rename to modules/connection/Dockerfile diff --git a/modules/api/app/__init__.py b/modules/connection/app/__init__.py similarity index 100% rename from modules/api/app/__init__.py rename to modules/connection/app/__init__.py diff --git a/modules/api/app/config.py b/modules/connection/app/config.py similarity index 100% rename from modules/api/app/config.py rename to modules/connection/app/config.py diff --git a/modules/api/app/routes.py b/modules/connection/app/routes.py similarity index 100% rename from modules/api/app/routes.py rename to modules/connection/app/routes.py diff --git a/modules/api/app/udaconnect/__init__.py b/modules/connection/app/udaconnect/__init__.py similarity index 100% rename from modules/api/app/udaconnect/__init__.py rename to modules/connection/app/udaconnect/__init__.py diff --git a/modules/connection/app/udaconnect/controllers.py b/modules/connection/app/udaconnect/controllers.py new file mode 100644 index 000000000..4e2d0ff71 --- /dev/null +++ b/modules/connection/app/udaconnect/controllers.py @@ -0,0 +1,40 @@ +from datetime import datetime + +from app.udaconnect.models import Connection +from app.udaconnect.schemas import ( + ConnectionSchema +) +from app.udaconnect.services import ConnectionService +from flask import request +from flask_accepts import accepts, responds +from flask_restx import Namespace, Resource +from typing import Optional, List + +DATE_FORMAT = "%Y-%m-%d" + +api = Namespace("UdaConnect", description="Connections via geolocation.") # noqa + + +# TODO: This needs better exception handling + + +@api.route("/persons//connection") +@api.param("start_date", "Lower bound of date range", _in="query") +@api.param("end_date", "Upper bound of date range", _in="query") +@api.param("distance", "Proximity to a given user in meters", _in="query") +class ConnectionDataResource(Resource): + @responds(schema=ConnectionSchema, many=True) + def get(self, person_id) -> ConnectionSchema: + start_date: datetime = datetime.strptime( + request.args["start_date"], DATE_FORMAT + ) + end_date: datetime = datetime.strptime(request.args["end_date"], DATE_FORMAT) + distance: Optional[int] = request.args.get("distance", 5) + + results = ConnectionService.find_contacts( + person_id=person_id, + start_date=start_date, + end_date=end_date, + meters=distance, + ) + return results diff --git a/modules/api/app/udaconnect/models.py b/modules/connection/app/udaconnect/models.py similarity index 100% rename from modules/api/app/udaconnect/models.py rename to modules/connection/app/udaconnect/models.py diff --git a/modules/api/app/udaconnect/schemas.py b/modules/connection/app/udaconnect/schemas.py similarity index 100% rename from modules/api/app/udaconnect/schemas.py rename to modules/connection/app/udaconnect/schemas.py diff --git a/modules/api/app/udaconnect/services.py b/modules/connection/app/udaconnect/services.py similarity index 62% rename from modules/api/app/udaconnect/services.py rename to modules/connection/app/udaconnect/services.py index c248c31b2..220404374 100644 --- a/modules/api/app/udaconnect/services.py +++ b/modules/connection/app/udaconnect/services.py @@ -7,9 +7,14 @@ from app.udaconnect.schemas import ConnectionSchema, LocationSchema, PersonSchema from geoalchemy2.functions import ST_AsText, ST_Point from sqlalchemy.sql import text +import os +import grpc +import persons_pb2 +import persons_pb2_grpc -logging.basicConfig(level=logging.WARNING) -logger = logging.getLogger("udaconnect-api") + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("connection-api") class ConnectionService: @@ -30,7 +35,13 @@ def find_contacts(person_id: int, start_date: datetime, end_date: datetime, mete ).all() # Cache all users in memory for quick lookup - person_map: Dict[str, Person] = {person.id: person for person in PersonService.retrieve_all()} + grpc_server_address = os.environ["GRPC_SERVER_ADDRESS"] + channel = grpc.insecure_channel(grpc_server_address) + stub = persons_pb2_grpc.PersonServiceStub(channel=channel) + request = persons_pb2.RetrieveAllPersonsRequest() + list_persons: persons_pb2.ListPerson = stub.RetrieveAllPersons(request) + person_map: Dict[str, Person] = {person.id: person for person in list_persons.persons} + logging.info(person_map) # Prepare arguments for queries data = [] @@ -79,56 +90,3 @@ def find_contacts(person_id: int, start_date: datetime, end_date: datetime, mete ) return result - - -class LocationService: - @staticmethod - def retrieve(location_id) -> Location: - location, coord_text = ( - db.session.query(Location, Location.coordinate.ST_AsText()) - .filter(Location.id == location_id) - .one() - ) - - # Rely on database to return text form of point to reduce overhead of conversion in app code - location.wkt_shape = coord_text - return location - - @staticmethod - def create(location: Dict) -> Location: - validation_results: Dict = LocationSchema().validate(location) - if validation_results: - logger.warning(f"Unexpected data format in payload: {validation_results}") - raise Exception(f"Invalid payload: {validation_results}") - - new_location = Location() - new_location.person_id = location["person_id"] - new_location.creation_time = location["creation_time"] - new_location.coordinate = ST_Point(location["latitude"], location["longitude"]) - db.session.add(new_location) - db.session.commit() - - return new_location - - -class PersonService: - @staticmethod - def create(person: Dict) -> Person: - new_person = Person() - new_person.first_name = person["first_name"] - new_person.last_name = person["last_name"] - new_person.company_name = person["company_name"] - - db.session.add(new_person) - db.session.commit() - - return new_person - - @staticmethod - def retrieve(person_id: int) -> Person: - person = db.session.query(Person).get(person_id) - return person - - @staticmethod - def retrieve_all() -> List[Person]: - return db.session.query(Person).all() diff --git a/modules/connection/persons_pb2.py b/modules/connection/persons_pb2.py new file mode 100644 index 000000000..c328841be --- /dev/null +++ b/modules/connection/persons_pb2.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: persons.proto +# Protobuf Python Version: 4.25.0 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rpersons.proto\x12\nudaconnect\"Q\n\x06Person\x12\n\n\x02id\x18\x01 \x01(\x05\x12\x12\n\nfirst_name\x18\x02 \x01(\t\x12\x11\n\tlast_name\x18\x03 \x01(\t\x12\x14\n\x0c\x63ompany_name\x18\x04 \x01(\t\"1\n\nListPerson\x12#\n\x07persons\x18\x01 \x03(\x0b\x32\x12.udaconnect.Person\"\x1b\n\x19RetrieveAllPersonsRequest2d\n\rPersonService\x12S\n\x12RetrieveAllPersons\x12%.udaconnect.RetrieveAllPersonsRequest\x1a\x16.udaconnect.ListPersonb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'persons_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_PERSON']._serialized_start=29 + _globals['_PERSON']._serialized_end=110 + _globals['_LISTPERSON']._serialized_start=112 + _globals['_LISTPERSON']._serialized_end=161 + _globals['_RETRIEVEALLPERSONSREQUEST']._serialized_start=163 + _globals['_RETRIEVEALLPERSONSREQUEST']._serialized_end=190 + _globals['_PERSONSERVICE']._serialized_start=192 + _globals['_PERSONSERVICE']._serialized_end=292 +# @@protoc_insertion_point(module_scope) diff --git a/modules/connection/persons_pb2_grpc.py b/modules/connection/persons_pb2_grpc.py new file mode 100644 index 000000000..3f496f1ec --- /dev/null +++ b/modules/connection/persons_pb2_grpc.py @@ -0,0 +1,66 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +import persons_pb2 as persons__pb2 + + +class PersonServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.RetrieveAllPersons = channel.unary_unary( + '/udaconnect.PersonService/RetrieveAllPersons', + request_serializer=persons__pb2.RetrieveAllPersonsRequest.SerializeToString, + response_deserializer=persons__pb2.ListPerson.FromString, + ) + + +class PersonServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def RetrieveAllPersons(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_PersonServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'RetrieveAllPersons': grpc.unary_unary_rpc_method_handler( + servicer.RetrieveAllPersons, + request_deserializer=persons__pb2.RetrieveAllPersonsRequest.FromString, + response_serializer=persons__pb2.ListPerson.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'udaconnect.PersonService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class PersonService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def RetrieveAllPersons(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/udaconnect.PersonService/RetrieveAllPersons', + persons__pb2.RetrieveAllPersonsRequest.SerializeToString, + persons__pb2.ListPerson.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/modules/connection/requirements.txt b/modules/connection/requirements.txt new file mode 100644 index 000000000..d62172337 --- /dev/null +++ b/modules/connection/requirements.txt @@ -0,0 +1,28 @@ +aniso8601==7.0.0 +attrs==19.1.0 +Click==7.0 +Flask==1.1.1 +flask-accepts==0.10.0 +flask-cors==3.0.8 +Flask-RESTful==0.3.7 +flask-restplus==0.12.1 +Flask-Script==2.0.6 +Flask-SQLAlchemy==2.4.0 +GeoAlchemy2==0.8.4 +itsdangerous==1.1.0 +Jinja2==2.11.2 +jsonschema==3.0.2 +MarkupSafe==1.1.1 +marshmallow==3.7.1 +marshmallow-sqlalchemy==0.23.1 +psycopg2-binary==2.8.5 +pyrsistent==0.16.0 +python-dateutil==2.8.1 +pytz==2020.1 +six==1.15.0 +shapely==1.7.0 +SQLAlchemy==1.3.19 +Werkzeug==0.16.1 +flask-restx==0.2.0 +grpcio==1.60.0 +protobuf \ No newline at end of file diff --git a/modules/api/wsgi.py b/modules/connection/wsgi.py similarity index 100% rename from modules/api/wsgi.py rename to modules/connection/wsgi.py diff --git a/modules/frontend/src/components/Connection.js b/modules/frontend/src/components/Connection.js index d23d1d3f2..85a7d0389 100644 --- a/modules/frontend/src/components/Connection.js +++ b/modules/frontend/src/components/Connection.js @@ -22,7 +22,7 @@ class Connection extends Component { if (personId) { // TODO: endpoint should be abstracted into a config variable fetch( - `http://localhost:30001/api/persons/${personId}/connection?start_date=2020-01-01&end_date=2020-12-30&distance=5` + `http://${process.env.CONNECTION_API}/api/persons/${personId}/connection?start_date=2020-01-01&end_date=2020-12-30&distance=5` ) .then((response) => response.json()) .then((connections) => diff --git a/modules/frontend/src/components/Persons.js b/modules/frontend/src/components/Persons.js index d75500bbb..755fc1ee5 100644 --- a/modules/frontend/src/components/Persons.js +++ b/modules/frontend/src/components/Persons.js @@ -5,7 +5,7 @@ class Persons extends Component { constructor(props) { super(props); // TODO: endpoint should be abstracted into a config variable - this.endpoint_url = "http://localhost:30001/api/persons"; + this.endpoint_url = `http://${process.env.PERSON_API}/api/persons`; this.state = { persons: [], display: null, diff --git a/modules/location/Dockerfile b/modules/location/Dockerfile new file mode 100644 index 000000000..1ef643ff1 --- /dev/null +++ b/modules/location/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.7-alpine + +WORKDIR . + +RUN apk add --no-cache gcc musl-dev linux-headers geos libc-dev postgresql-dev +COPY requirements.txt requirements.txt +RUN pip install -r requirements.txt + +EXPOSE 5000 + +COPY . . +CMD ["flask", "run", "--host", "0.0.0.0"] \ No newline at end of file diff --git a/modules/location/app/__init__.py b/modules/location/app/__init__.py new file mode 100644 index 000000000..627a5c5f6 --- /dev/null +++ b/modules/location/app/__init__.py @@ -0,0 +1,26 @@ +from flask import Flask, jsonify +from flask_cors import CORS +from flask_restx import Api +from flask_sqlalchemy import SQLAlchemy + +db = SQLAlchemy() + + +def create_app(env=None): + from app.config import config_by_name + from app.routes import register_routes + + app = Flask(__name__) + app.config.from_object(config_by_name[env or "test"]) + api = Api(app, title="UdaConnect API", version="0.1.0") + + CORS(app) # Set CORS for development + + register_routes(api, app) + db.init_app(app) + + @app.route("/health") + def health(): + return jsonify("healthy") + + return app diff --git a/modules/location/app/config.py b/modules/location/app/config.py new file mode 100644 index 000000000..827b6a14a --- /dev/null +++ b/modules/location/app/config.py @@ -0,0 +1,58 @@ +import os +from typing import List, Type + +DB_USERNAME = os.environ["DB_USERNAME"] +DB_PASSWORD = os.environ["DB_PASSWORD"] +DB_HOST = os.environ["DB_HOST"] +DB_PORT = os.environ["DB_PORT"] +DB_NAME = os.environ["DB_NAME"] + + +class BaseConfig: + CONFIG_NAME = "base" + USE_MOCK_EQUIVALENCY = False + DEBUG = False + SQLALCHEMY_TRACK_MODIFICATIONS = False + + +class DevelopmentConfig(BaseConfig): + CONFIG_NAME = "dev" + SECRET_KEY = os.getenv( + "DEV_SECRET_KEY", "You can't see California without Marlon Widgeto's eyes" + ) + DEBUG = True + SQLALCHEMY_TRACK_MODIFICATIONS = False + TESTING = False + SQLALCHEMY_DATABASE_URI = ( + f"postgresql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" + ) + + +class TestingConfig(BaseConfig): + CONFIG_NAME = "test" + SECRET_KEY = os.getenv("TEST_SECRET_KEY", "Thanos did nothing wrong") + DEBUG = True + SQLALCHEMY_TRACK_MODIFICATIONS = False + TESTING = True + SQLALCHEMY_DATABASE_URI = ( + f"postgresql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" + ) + + +class ProductionConfig(BaseConfig): + CONFIG_NAME = "prod" + SECRET_KEY = os.getenv("PROD_SECRET_KEY", "I'm Ron Burgundy?") + DEBUG = False + SQLALCHEMY_TRACK_MODIFICATIONS = False + TESTING = False + SQLALCHEMY_DATABASE_URI = ( + f"postgresql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" + ) + + +EXPORT_CONFIGS: List[Type[BaseConfig]] = [ + DevelopmentConfig, + TestingConfig, + ProductionConfig, +] +config_by_name = {cfg.CONFIG_NAME: cfg for cfg in EXPORT_CONFIGS} diff --git a/modules/location/app/routes.py b/modules/location/app/routes.py new file mode 100644 index 000000000..c6b1c20eb --- /dev/null +++ b/modules/location/app/routes.py @@ -0,0 +1,5 @@ +def register_routes(api, app, root="api"): + from app.udaconnect import register_routes as attach_udaconnect + + # Add routes + attach_udaconnect(api, app) diff --git a/modules/location/app/udaconnect/__init__.py b/modules/location/app/udaconnect/__init__.py new file mode 100644 index 000000000..5cef3c90b --- /dev/null +++ b/modules/location/app/udaconnect/__init__.py @@ -0,0 +1,8 @@ +from app.udaconnect.models import Connection, Location, Person # noqa +from app.udaconnect.schemas import ConnectionSchema, LocationSchema, PersonSchema # noqa + + +def register_routes(api, app, root="api"): + from app.udaconnect.controllers import api as udaconnect_api + + api.add_namespace(udaconnect_api, path=f"/{root}") diff --git a/modules/location/app/udaconnect/controllers.py b/modules/location/app/udaconnect/controllers.py new file mode 100644 index 000000000..3fbb65f63 --- /dev/null +++ b/modules/location/app/udaconnect/controllers.py @@ -0,0 +1,37 @@ +from datetime import datetime + +from app.udaconnect.models import Connection, Location, Person +from app.udaconnect.schemas import ( + ConnectionSchema, + LocationSchema, + PersonSchema, +) +from app.udaconnect.services import LocationService +from flask import request +from flask_accepts import accepts, responds +from flask_restx import Namespace, Resource +from typing import Optional, List + +DATE_FORMAT = "%Y-%m-%d" + +api = Namespace("UdaConnect", description="Connections via geolocation.") # noqa + + +# TODO: This needs better exception handling + + +@api.route("/locations") +@api.route("/locations/") +@api.param("location_id", "Unique ID for a given Location", _in="query") +class LocationResource(Resource): + @accepts(schema=LocationSchema) + @responds(schema=LocationSchema) + def post(self): + request.get_json() + location = LocationService.create(request.get_json()) + return location + + @responds(schema=LocationSchema) + def get(self, location_id) -> Location: + location: Location = LocationService.retrieve(location_id) + return location diff --git a/modules/location/app/udaconnect/models.py b/modules/location/app/udaconnect/models.py new file mode 100644 index 000000000..94673a9e6 --- /dev/null +++ b/modules/location/app/udaconnect/models.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime + +from app import db # noqa +from geoalchemy2 import Geometry +from geoalchemy2.shape import to_shape +from shapely.geometry.point import Point +from sqlalchemy import BigInteger, Column, Date, DateTime, ForeignKey, Integer, String +from sqlalchemy.dialects.postgresql import JSONB, UUID +from sqlalchemy.ext.hybrid import hybrid_property + + +class Person(db.Model): + __tablename__ = "person" + + id = Column(Integer, primary_key=True) + first_name = Column(String, nullable=False) + last_name = Column(String, nullable=False) + company_name = Column(String, nullable=False) + + +class Location(db.Model): + __tablename__ = "location" + + id = Column(BigInteger, primary_key=True) + person_id = Column(Integer, ForeignKey(Person.id), nullable=False) + coordinate = Column(Geometry("POINT"), nullable=False) + creation_time = Column(DateTime, nullable=False, default=datetime.utcnow) + _wkt_shape: str = None + + @property + def wkt_shape(self) -> str: + # Persist binary form into readable text + if not self._wkt_shape: + point: Point = to_shape(self.coordinate) + # normalize WKT returned by to_wkt() from shapely and ST_AsText() from DB + self._wkt_shape = point.to_wkt().replace("POINT ", "ST_POINT") + return self._wkt_shape + + @wkt_shape.setter + def wkt_shape(self, v: str) -> None: + self._wkt_shape = v + + def set_wkt_with_coords(self, lat: str, long: str) -> str: + self._wkt_shape = f"ST_POINT({lat} {long})" + return self._wkt_shape + + @hybrid_property + def longitude(self) -> str: + coord_text = self.wkt_shape + return coord_text[coord_text.find(" ") + 1 : coord_text.find(")")] + + @hybrid_property + def latitude(self) -> str: + coord_text = self.wkt_shape + return coord_text[coord_text.find("(") + 1 : coord_text.find(" ")] + + +@dataclass +class Connection: + location: Location + person: Person diff --git a/modules/location/app/udaconnect/schemas.py b/modules/location/app/udaconnect/schemas.py new file mode 100644 index 000000000..b2ce23961 --- /dev/null +++ b/modules/location/app/udaconnect/schemas.py @@ -0,0 +1,30 @@ +from app.udaconnect.models import Connection, Location, Person +from geoalchemy2.types import Geometry as GeometryType +from marshmallow import Schema, fields +from marshmallow_sqlalchemy.convert import ModelConverter as BaseModelConverter + + +class LocationSchema(Schema): + id = fields.Integer() + person_id = fields.Integer() + longitude = fields.String(attribute="longitude") + latitude = fields.String(attribute="latitude") + creation_time = fields.DateTime() + + class Meta: + model = Location + + +class PersonSchema(Schema): + id = fields.Integer() + first_name = fields.String() + last_name = fields.String() + company_name = fields.String() + + class Meta: + model = Person + + +class ConnectionSchema(Schema): + location = fields.Nested(LocationSchema) + person = fields.Nested(PersonSchema) diff --git a/modules/location/app/udaconnect/services.py b/modules/location/app/udaconnect/services.py new file mode 100644 index 000000000..5027d0343 --- /dev/null +++ b/modules/location/app/udaconnect/services.py @@ -0,0 +1,60 @@ +import logging +from datetime import datetime, timedelta +from typing import Dict, List + +from app import db +from app.udaconnect.models import Connection, Location, Person +from app.udaconnect.schemas import ConnectionSchema, LocationSchema, PersonSchema +from geoalchemy2.functions import ST_AsText, ST_Point +from sqlalchemy.sql import text +import os +import json + +from kafka import KafkaProducer + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("location-api") + +class LocationService: + @staticmethod + def retrieve(location_id) -> Location: + location, coord_text = ( + db.session.query(Location, Location.coordinate.ST_AsText()) + .filter(Location.id == location_id) + .one() + ) + + # Rely on database to return text form of point to reduce overhead of conversion in app code + location.wkt_shape = coord_text + return location + + @staticmethod + def create(location: Dict): + validation_results: Dict = LocationSchema().validate(location) + if validation_results: + logger.warning(f"Unexpected data format in payload: {validation_results}") + raise Exception(f"Invalid payload: {validation_results}") + + kafka_bootstrap_servers = os.environ["KAFKA_BOOTSTRAP_SERVERS"] + logging.info(kafka_bootstrap_servers) + producer_config = { + 'bootstrap_servers': kafka_bootstrap_servers, + 'client_id': 'location-producer', + 'value_serializer': lambda v: json.dumps(v).encode('utf-8'), + } + + producer = KafkaProducer(**producer_config) + creation_time_str = location["creation_time"] + location["creation_time"] = datetime.fromisoformat(creation_time_str) + + location_data = { + 'person_id': location["person_id"], + 'latitude': location["latitude"], + 'longitude': location["longitude"], + 'creation_time': location["creation_time"].isoformat() + } + + kafka_topic = 'location-topic' + producer.send(kafka_topic, value=location_data) + + return location diff --git a/modules/api/requirements.txt b/modules/location/requirements.txt similarity index 97% rename from modules/api/requirements.txt rename to modules/location/requirements.txt index 652e39c33..12de1a10a 100644 --- a/modules/api/requirements.txt +++ b/modules/location/requirements.txt @@ -24,3 +24,4 @@ shapely==1.7.0 SQLAlchemy==1.3.19 Werkzeug==0.16.1 flask-restx==0.2.0 +kafka-python \ No newline at end of file diff --git a/modules/location/wsgi.py b/modules/location/wsgi.py new file mode 100644 index 000000000..63fc43373 --- /dev/null +++ b/modules/location/wsgi.py @@ -0,0 +1,7 @@ +import os + +from app import create_app + +app = create_app(os.getenv("FLASK_ENV") or "test") +if __name__ == "__main__": + app.run(debug=True) diff --git a/modules/location_consumer/Dockerfile b/modules/location_consumer/Dockerfile new file mode 100644 index 000000000..c99e98bbe --- /dev/null +++ b/modules/location_consumer/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.7-alpine + +WORKDIR . + +RUN apk add --no-cache gcc musl-dev linux-headers geos libc-dev postgresql-dev +COPY requirements.txt requirements.txt +RUN pip install -r requirements.txt + +COPY . . +CMD ["python", "app/app.py"] \ No newline at end of file diff --git a/modules/location_consumer/app/app.py b/modules/location_consumer/app/app.py new file mode 100644 index 000000000..36f24e668 --- /dev/null +++ b/modules/location_consumer/app/app.py @@ -0,0 +1,140 @@ +import logging +from datetime import datetime, timedelta +from typing import Dict, List + +from geoalchemy2.functions import ST_AsText, ST_Point +from sqlalchemy import create_engine, BigInteger, Column, Date, DateTime, ForeignKey, Integer, String +from geoalchemy2 import Geometry +from geoalchemy2.shape import to_shape +from shapely.geometry.point import Point +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker +from marshmallow import Schema, fields +from sqlalchemy.dialects.postgresql import JSONB, UUID +from sqlalchemy.ext.hybrid import hybrid_property + +import os +import json +from kafka import KafkaConsumer + +DB_USERNAME = os.environ["DB_USERNAME"] +DB_PASSWORD = os.environ["DB_PASSWORD"] +DB_HOST = os.environ["DB_HOST"] +DB_PORT = os.environ["DB_PORT"] +DB_NAME = os.environ["DB_NAME"] + +SQLALCHEMY_DATABASE_URI = ( + f"postgresql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" + ) + +logging.basicConfig(level= logging.INFO) + +logging.info(SQLALCHEMY_DATABASE_URI) + +engine = create_engine(SQLALCHEMY_DATABASE_URI) +Session = sessionmaker(bind=engine) +db = Session() + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("location-consumer") + + +Base = declarative_base() + +class Location(Base): + __tablename__ = "location" + + id = Column(BigInteger, primary_key=True) + person_id = Column(Integer, nullable=False) + coordinate = Column(Geometry("POINT"), nullable=False) + creation_time = Column(DateTime, nullable=False, default=datetime.utcnow) + _wkt_shape: str = None + + @property + def wkt_shape(self) -> str: + # Persist binary form into readable text + if not self._wkt_shape: + point: Point = to_shape(self.coordinate) + # normalize WKT returned by to_wkt() from shapely and ST_AsText() from DB + self._wkt_shape = point.to_wkt().replace("POINT ", "ST_POINT") + return self._wkt_shape + + @wkt_shape.setter + def wkt_shape(self, v: str) -> None: + self._wkt_shape = v + + def set_wkt_with_coords(self, lat: str, long: str) -> str: + self._wkt_shape = f"ST_POINT({lat} {long})" + return self._wkt_shape + + @hybrid_property + def longitude(self) -> str: + coord_text = self.wkt_shape + return coord_text[coord_text.find(" ") + 1 : coord_text.find(")")] + + @hybrid_property + def latitude(self) -> str: + coord_text = self.wkt_shape + return coord_text[coord_text.find("(") + 1 : coord_text.find(" ")] + +class LocationSchema(Schema): + id = fields.Integer() + person_id = fields.Integer() + longitude = fields.String(attribute="longitude") + latitude = fields.String(attribute="latitude") + creation_time = fields.DateTime() + + class Meta: + model = Location + + +class LocationService: + @staticmethod + def create(location: Dict) -> Location: + validation_results: Dict = LocationSchema().validate(location) + if validation_results: + logger.warning(f"Unexpected data format in payload: {validation_results}") + raise Exception(f"Invalid payload: {validation_results}") + + new_location = Location() + new_location.person_id = location["person_id"] + new_location.creation_time = location["creation_time"] + new_location.coordinate = ST_Point(location["latitude"], location["longitude"]) + db.add(new_location) + db.commit() + + return new_location + + +# Kafka consumer configuration +kafka_bootstrap_servers = os.environ["KAFKA_BOOTSTRAP_SERVERS"] +consumer_config = { + 'bootstrap_servers': kafka_bootstrap_servers, + 'group_id': 'location-consumer-group', + 'auto_offset_reset': 'earliest', + 'enable_auto_commit': True, +} + +# Create Kafka consumer +kafka_topic = os.environ["KAFKA_TOPIC"] +consumer = KafkaConsumer(kafka_topic, **consumer_config) + +try: + for msg in consumer: + try: + # Process the received message + location = json.loads(msg.value.decode('utf-8')) + + # Create a person using PersonService + LocationService.create(location) + + logger.info(f'Location created: {location}') + except json.JSONDecodeError as e: + logger.error(f'Error decoding JSON: {e}') + +except KeyboardInterrupt: + pass +finally: + # Close the consumer + consumer.close() \ No newline at end of file diff --git a/modules/location_consumer/requirements.txt b/modules/location_consumer/requirements.txt new file mode 100644 index 000000000..12de1a10a --- /dev/null +++ b/modules/location_consumer/requirements.txt @@ -0,0 +1,27 @@ +aniso8601==7.0.0 +attrs==19.1.0 +Click==7.0 +Flask==1.1.1 +flask-accepts==0.10.0 +flask-cors==3.0.8 +Flask-RESTful==0.3.7 +flask-restplus==0.12.1 +Flask-Script==2.0.6 +Flask-SQLAlchemy==2.4.0 +GeoAlchemy2==0.8.4 +itsdangerous==1.1.0 +Jinja2==2.11.2 +jsonschema==3.0.2 +MarkupSafe==1.1.1 +marshmallow==3.7.1 +marshmallow-sqlalchemy==0.23.1 +psycopg2-binary==2.8.5 +pyrsistent==0.16.0 +python-dateutil==2.8.1 +pytz==2020.1 +six==1.15.0 +shapely==1.7.0 +SQLAlchemy==1.3.19 +Werkzeug==0.16.1 +flask-restx==0.2.0 +kafka-python \ No newline at end of file diff --git a/modules/person/Dockerfile b/modules/person/Dockerfile new file mode 100644 index 000000000..1ef643ff1 --- /dev/null +++ b/modules/person/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.7-alpine + +WORKDIR . + +RUN apk add --no-cache gcc musl-dev linux-headers geos libc-dev postgresql-dev +COPY requirements.txt requirements.txt +RUN pip install -r requirements.txt + +EXPOSE 5000 + +COPY . . +CMD ["flask", "run", "--host", "0.0.0.0"] \ No newline at end of file diff --git a/modules/person/app/__init__.py b/modules/person/app/__init__.py new file mode 100644 index 000000000..627a5c5f6 --- /dev/null +++ b/modules/person/app/__init__.py @@ -0,0 +1,26 @@ +from flask import Flask, jsonify +from flask_cors import CORS +from flask_restx import Api +from flask_sqlalchemy import SQLAlchemy + +db = SQLAlchemy() + + +def create_app(env=None): + from app.config import config_by_name + from app.routes import register_routes + + app = Flask(__name__) + app.config.from_object(config_by_name[env or "test"]) + api = Api(app, title="UdaConnect API", version="0.1.0") + + CORS(app) # Set CORS for development + + register_routes(api, app) + db.init_app(app) + + @app.route("/health") + def health(): + return jsonify("healthy") + + return app diff --git a/modules/person/app/config.py b/modules/person/app/config.py new file mode 100644 index 000000000..827b6a14a --- /dev/null +++ b/modules/person/app/config.py @@ -0,0 +1,58 @@ +import os +from typing import List, Type + +DB_USERNAME = os.environ["DB_USERNAME"] +DB_PASSWORD = os.environ["DB_PASSWORD"] +DB_HOST = os.environ["DB_HOST"] +DB_PORT = os.environ["DB_PORT"] +DB_NAME = os.environ["DB_NAME"] + + +class BaseConfig: + CONFIG_NAME = "base" + USE_MOCK_EQUIVALENCY = False + DEBUG = False + SQLALCHEMY_TRACK_MODIFICATIONS = False + + +class DevelopmentConfig(BaseConfig): + CONFIG_NAME = "dev" + SECRET_KEY = os.getenv( + "DEV_SECRET_KEY", "You can't see California without Marlon Widgeto's eyes" + ) + DEBUG = True + SQLALCHEMY_TRACK_MODIFICATIONS = False + TESTING = False + SQLALCHEMY_DATABASE_URI = ( + f"postgresql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" + ) + + +class TestingConfig(BaseConfig): + CONFIG_NAME = "test" + SECRET_KEY = os.getenv("TEST_SECRET_KEY", "Thanos did nothing wrong") + DEBUG = True + SQLALCHEMY_TRACK_MODIFICATIONS = False + TESTING = True + SQLALCHEMY_DATABASE_URI = ( + f"postgresql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" + ) + + +class ProductionConfig(BaseConfig): + CONFIG_NAME = "prod" + SECRET_KEY = os.getenv("PROD_SECRET_KEY", "I'm Ron Burgundy?") + DEBUG = False + SQLALCHEMY_TRACK_MODIFICATIONS = False + TESTING = False + SQLALCHEMY_DATABASE_URI = ( + f"postgresql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" + ) + + +EXPORT_CONFIGS: List[Type[BaseConfig]] = [ + DevelopmentConfig, + TestingConfig, + ProductionConfig, +] +config_by_name = {cfg.CONFIG_NAME: cfg for cfg in EXPORT_CONFIGS} diff --git a/modules/person/app/routes.py b/modules/person/app/routes.py new file mode 100644 index 000000000..c6b1c20eb --- /dev/null +++ b/modules/person/app/routes.py @@ -0,0 +1,5 @@ +def register_routes(api, app, root="api"): + from app.udaconnect import register_routes as attach_udaconnect + + # Add routes + attach_udaconnect(api, app) diff --git a/modules/person/app/udaconnect/__init__.py b/modules/person/app/udaconnect/__init__.py new file mode 100644 index 000000000..684e4f074 --- /dev/null +++ b/modules/person/app/udaconnect/__init__.py @@ -0,0 +1,8 @@ +from app.udaconnect.models import Person # noqa +from app.udaconnect.schemas import PersonSchema # noqa + + +def register_routes(api, app, root="api"): + from app.udaconnect.controllers import api as udaconnect_api + + api.add_namespace(udaconnect_api, path=f"/{root}") diff --git a/modules/person/app/udaconnect/controllers.py b/modules/person/app/udaconnect/controllers.py new file mode 100644 index 000000000..6ebeb5c08 --- /dev/null +++ b/modules/person/app/udaconnect/controllers.py @@ -0,0 +1,37 @@ +from datetime import datetime + +from app.udaconnect.models import Person +from app.udaconnect.schemas import ( + PersonSchema, +) +from app.udaconnect.services import PersonService +from flask import request +from flask_accepts import accepts, responds +from flask_restx import Namespace, Resource +from typing import Optional, List + +DATE_FORMAT = "%Y-%m-%d" + +api = Namespace("UdaConnect", description="Connections via geolocation.") # noqa + + +# TODO: This needs better exception handling + + +@api.route("/persons") +class PersonsResource(Resource): + @accepts(schema=PersonSchema) + @responds(schema=PersonSchema) + def post(self) -> Person: + payload = request.get_json() + new_person: Person = PersonService.create(payload) + return new_person + +@api.route("/persons/") +@api.param("person_id", "Unique ID for a given Person", _in="query") +class PersonResource(Resource): + @responds(schema=PersonSchema) + def get(self, person_id) -> Person: + person: Person = PersonService.retrieve(person_id) + return person + diff --git a/modules/person/app/udaconnect/models.py b/modules/person/app/udaconnect/models.py new file mode 100644 index 000000000..8cac7f6b5 --- /dev/null +++ b/modules/person/app/udaconnect/models.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime + +from app import db # noqa +from geoalchemy2 import Geometry +from geoalchemy2.shape import to_shape +from shapely.geometry.point import Point +from sqlalchemy import BigInteger, Column, Date, DateTime, ForeignKey, Integer, String +from sqlalchemy.dialects.postgresql import JSONB, UUID +from sqlalchemy.ext.hybrid import hybrid_property + + +class Person(db.Model): + __tablename__ = "person" + + id = Column(Integer, primary_key=True) + first_name = Column(String, nullable=False) + last_name = Column(String, nullable=False) + company_name = Column(String, nullable=False) + + + +@dataclass +class Connection: + person: Person diff --git a/modules/person/app/udaconnect/schemas.py b/modules/person/app/udaconnect/schemas.py new file mode 100644 index 000000000..879b2136a --- /dev/null +++ b/modules/person/app/udaconnect/schemas.py @@ -0,0 +1,15 @@ +from app.udaconnect.models import Person +from geoalchemy2.types import Geometry as GeometryType +from marshmallow import Schema, fields +from marshmallow_sqlalchemy.convert import ModelConverter as BaseModelConverter + + +class PersonSchema(Schema): + id = fields.Integer() + first_name = fields.String() + last_name = fields.String() + company_name = fields.String() + + class Meta: + model = Person + diff --git a/modules/person/app/udaconnect/services.py b/modules/person/app/udaconnect/services.py new file mode 100644 index 000000000..ee015eb51 --- /dev/null +++ b/modules/person/app/udaconnect/services.py @@ -0,0 +1,48 @@ +import logging +from datetime import datetime, timedelta +from typing import Dict, List + +from app import db +from app.udaconnect.models import Person +from app.udaconnect.schemas import PersonSchema +from geoalchemy2.functions import ST_AsText, ST_Point +from sqlalchemy.sql import text + +import os +import json +from kafka import KafkaProducer + +logging.basicConfig(level=logging.WARNING) +logger = logging.getLogger("person-api") + + +class PersonService: + @staticmethod + def create(person: Dict) -> Person: + new_person = Person() + new_person.first_name = person["first_name"] + new_person.last_name = person["last_name"] + new_person.company_name = person["company_name"] + + kafka_bootstrap_servers = os.environ["KAFKA_BOOTSTRAP_SERVERS"] + producer_config = { + 'bootstrap_servers': kafka_bootstrap_servers, + 'client_id': 'location-producer', + 'value_serializer': lambda v: json.dumps(v).encode('utf-8'), + } + + producer = KafkaProducer(**producer_config) + + kafka_topic = 'person-topic' + producer.send(kafka_topic, value=person) + + return new_person + + @staticmethod + def retrieve(person_id: int) -> Person: + person = db.session.query(Person).get(person_id) + return person + + @staticmethod + def retrieve_all() -> List[Person]: + return db.session.query(Person).all() diff --git a/modules/person/requirements.txt b/modules/person/requirements.txt new file mode 100644 index 000000000..12de1a10a --- /dev/null +++ b/modules/person/requirements.txt @@ -0,0 +1,27 @@ +aniso8601==7.0.0 +attrs==19.1.0 +Click==7.0 +Flask==1.1.1 +flask-accepts==0.10.0 +flask-cors==3.0.8 +Flask-RESTful==0.3.7 +flask-restplus==0.12.1 +Flask-Script==2.0.6 +Flask-SQLAlchemy==2.4.0 +GeoAlchemy2==0.8.4 +itsdangerous==1.1.0 +Jinja2==2.11.2 +jsonschema==3.0.2 +MarkupSafe==1.1.1 +marshmallow==3.7.1 +marshmallow-sqlalchemy==0.23.1 +psycopg2-binary==2.8.5 +pyrsistent==0.16.0 +python-dateutil==2.8.1 +pytz==2020.1 +six==1.15.0 +shapely==1.7.0 +SQLAlchemy==1.3.19 +Werkzeug==0.16.1 +flask-restx==0.2.0 +kafka-python \ No newline at end of file diff --git a/modules/person/wsgi.py b/modules/person/wsgi.py new file mode 100644 index 000000000..63fc43373 --- /dev/null +++ b/modules/person/wsgi.py @@ -0,0 +1,7 @@ +import os + +from app import create_app + +app = create_app(os.getenv("FLASK_ENV") or "test") +if __name__ == "__main__": + app.run(debug=True) diff --git a/modules/person_consumer/Dockerfile b/modules/person_consumer/Dockerfile new file mode 100644 index 000000000..c99e98bbe --- /dev/null +++ b/modules/person_consumer/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.7-alpine + +WORKDIR . + +RUN apk add --no-cache gcc musl-dev linux-headers geos libc-dev postgresql-dev +COPY requirements.txt requirements.txt +RUN pip install -r requirements.txt + +COPY . . +CMD ["python", "app/app.py"] \ No newline at end of file diff --git a/modules/person_consumer/app/app.py b/modules/person_consumer/app/app.py new file mode 100644 index 000000000..a6175dc09 --- /dev/null +++ b/modules/person_consumer/app/app.py @@ -0,0 +1,102 @@ +import logging +from datetime import datetime, timedelta +from typing import Dict, List + +from geoalchemy2.functions import ST_AsText, ST_Point +from sqlalchemy import create_engine, Column, Integer, String +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker +from marshmallow import Schema, fields + +import os +import json +from kafka import KafkaConsumer + +DB_USERNAME = os.environ["DB_USERNAME"] +DB_PASSWORD = os.environ["DB_PASSWORD"] +DB_HOST = os.environ["DB_HOST"] +DB_PORT = os.environ["DB_PORT"] +DB_NAME = os.environ["DB_NAME"] + +SQLALCHEMY_DATABASE_URI = ( + f"postgresql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" + ) + +logging.basicConfig(level= logging.INFO) + +logging.info(SQLALCHEMY_DATABASE_URI) + +engine = create_engine(SQLALCHEMY_DATABASE_URI) +Session = sessionmaker(bind=engine) +db = Session() + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("person-consumer") + + +Base = declarative_base() + +class Person(Base): + __tablename__ = 'person' + + id = Column(Integer, primary_key=True) + first_name = Column(String) + last_name = Column(String) + company_name = Column(String) + +class PersonSchema(Schema): + id = fields.Integer() + first_name = fields.String() + last_name = fields.String() + company_name = fields.String() + + class Meta: + model = Person + + +class PersonService: + @staticmethod + def create(person: Dict) -> Person: + new_person = Person() + new_person.first_name = person["first_name"] + new_person.last_name = person["last_name"] + new_person.company_name = person["company_name"] + + db.add(new_person) + db.commit() + + return new_person + + +# Kafka consumer configuration +kafka_bootstrap_servers = os.environ["KAFKA_BOOTSTRAP_SERVERS"] +consumer_config = { + 'bootstrap_servers': kafka_bootstrap_servers, + 'group_id': 'person-consumer-group', + 'auto_offset_reset': 'earliest', + 'enable_auto_commit': True, +} + +# Create Kafka consumer +kafka_topic = os.environ["KAFKA_TOPIC"] +consumer = KafkaConsumer(kafka_topic, **consumer_config) + +try: + for msg in consumer: + try: + # Process the received message + person_data = json.loads(msg.value.decode('utf-8')) + + # Create a person using PersonService + PersonService.create(person_data) + + logger.info(f'Person created: {person_data}') + except json.JSONDecodeError as e: + logger.error(f'Error decoding JSON: {e}') + +except KeyboardInterrupt: + pass +finally: + # Close the consumer + consumer.close() \ No newline at end of file diff --git a/modules/person_consumer/requirements.txt b/modules/person_consumer/requirements.txt new file mode 100644 index 000000000..12de1a10a --- /dev/null +++ b/modules/person_consumer/requirements.txt @@ -0,0 +1,27 @@ +aniso8601==7.0.0 +attrs==19.1.0 +Click==7.0 +Flask==1.1.1 +flask-accepts==0.10.0 +flask-cors==3.0.8 +Flask-RESTful==0.3.7 +flask-restplus==0.12.1 +Flask-Script==2.0.6 +Flask-SQLAlchemy==2.4.0 +GeoAlchemy2==0.8.4 +itsdangerous==1.1.0 +Jinja2==2.11.2 +jsonschema==3.0.2 +MarkupSafe==1.1.1 +marshmallow==3.7.1 +marshmallow-sqlalchemy==0.23.1 +psycopg2-binary==2.8.5 +pyrsistent==0.16.0 +python-dateutil==2.8.1 +pytz==2020.1 +six==1.15.0 +shapely==1.7.0 +SQLAlchemy==1.3.19 +Werkzeug==0.16.1 +flask-restx==0.2.0 +kafka-python \ No newline at end of file diff --git a/modules/person_rpc/Dockerfile b/modules/person_rpc/Dockerfile new file mode 100644 index 000000000..0dfc8239e --- /dev/null +++ b/modules/person_rpc/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.7-alpine + +WORKDIR . + +RUN apk add --no-cache gcc musl-dev linux-headers geos libc-dev postgresql-dev +COPY requirements.txt requirements.txt +RUN pip install -r requirements.txt + +EXPOSE 50051 + +COPY . . +CMD ["python", "app/server.py"] \ No newline at end of file diff --git a/modules/person_rpc/app/client.py b/modules/person_rpc/app/client.py new file mode 100644 index 000000000..99b8b4db2 --- /dev/null +++ b/modules/person_rpc/app/client.py @@ -0,0 +1,29 @@ +from __future__ import print_function + +import logging + +import grpc +import persons_pb2 +import persons_pb2_grpc +import os +from typing import Dict, List + +grpc_server_address = os.environ["GRPC_SERVER_ADDRESS"] + + +def run(): + # NOTE(gRPC Python Team): .close() is possible on a channel and should be + # used in circumstances in which the with statement does not fit the needs + # of the code. + with grpc.insecure_channel(grpc_server_address) as channel: + stub = persons_pb2_grpc.PersonServiceStub(channel=channel) + logging.info("get list user") + request = persons_pb2.RetrieveAllPersonsRequest() + list_persons: persons_pb2.ListPerson = stub.RetrieveAllPersons(request) + for person in list_persons.persons: + logging.info("id: %s, name: %s, company: %s", person.id, person.first_name, person.company_name) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + run() \ No newline at end of file diff --git a/modules/person_rpc/app/persons_pb2.py b/modules/person_rpc/app/persons_pb2.py new file mode 100644 index 000000000..c328841be --- /dev/null +++ b/modules/person_rpc/app/persons_pb2.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: persons.proto +# Protobuf Python Version: 4.25.0 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rpersons.proto\x12\nudaconnect\"Q\n\x06Person\x12\n\n\x02id\x18\x01 \x01(\x05\x12\x12\n\nfirst_name\x18\x02 \x01(\t\x12\x11\n\tlast_name\x18\x03 \x01(\t\x12\x14\n\x0c\x63ompany_name\x18\x04 \x01(\t\"1\n\nListPerson\x12#\n\x07persons\x18\x01 \x03(\x0b\x32\x12.udaconnect.Person\"\x1b\n\x19RetrieveAllPersonsRequest2d\n\rPersonService\x12S\n\x12RetrieveAllPersons\x12%.udaconnect.RetrieveAllPersonsRequest\x1a\x16.udaconnect.ListPersonb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'persons_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_PERSON']._serialized_start=29 + _globals['_PERSON']._serialized_end=110 + _globals['_LISTPERSON']._serialized_start=112 + _globals['_LISTPERSON']._serialized_end=161 + _globals['_RETRIEVEALLPERSONSREQUEST']._serialized_start=163 + _globals['_RETRIEVEALLPERSONSREQUEST']._serialized_end=190 + _globals['_PERSONSERVICE']._serialized_start=192 + _globals['_PERSONSERVICE']._serialized_end=292 +# @@protoc_insertion_point(module_scope) diff --git a/modules/person_rpc/app/persons_pb2_grpc.py b/modules/person_rpc/app/persons_pb2_grpc.py new file mode 100644 index 000000000..3f496f1ec --- /dev/null +++ b/modules/person_rpc/app/persons_pb2_grpc.py @@ -0,0 +1,66 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +import persons_pb2 as persons__pb2 + + +class PersonServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.RetrieveAllPersons = channel.unary_unary( + '/udaconnect.PersonService/RetrieveAllPersons', + request_serializer=persons__pb2.RetrieveAllPersonsRequest.SerializeToString, + response_deserializer=persons__pb2.ListPerson.FromString, + ) + + +class PersonServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def RetrieveAllPersons(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_PersonServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'RetrieveAllPersons': grpc.unary_unary_rpc_method_handler( + servicer.RetrieveAllPersons, + request_deserializer=persons__pb2.RetrieveAllPersonsRequest.FromString, + response_serializer=persons__pb2.ListPerson.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'udaconnect.PersonService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class PersonService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def RetrieveAllPersons(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/udaconnect.PersonService/RetrieveAllPersons', + persons__pb2.RetrieveAllPersonsRequest.SerializeToString, + persons__pb2.ListPerson.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/modules/person_rpc/app/server.py b/modules/person_rpc/app/server.py new file mode 100644 index 000000000..c68d12032 --- /dev/null +++ b/modules/person_rpc/app/server.py @@ -0,0 +1,64 @@ +from concurrent import futures +import os +import grpc +from sqlalchemy import create_engine, Column, Integer, String +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker +import logging +import persons_pb2 +import persons_pb2_grpc + +DB_USERNAME = os.environ["DB_USERNAME"] +DB_PASSWORD = os.environ["DB_PASSWORD"] +DB_HOST = os.environ["DB_HOST"] +DB_PORT = os.environ["DB_PORT"] +DB_NAME = os.environ["DB_NAME"] + +SQLALCHEMY_DATABASE_URI = ( + f"postgresql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" + ) + +logging.basicConfig(level= logging.INFO) + +logging.info(SQLALCHEMY_DATABASE_URI) + +engine = create_engine(SQLALCHEMY_DATABASE_URI) +Session = sessionmaker(bind=engine) +db = Session() + +Base = declarative_base() + +class Person(Base): + __tablename__ = 'person' + + id = Column(Integer, primary_key=True) + first_name = Column(String) + last_name = Column(String) + company_name = Column(String) + +class PersonServicer(persons_pb2_grpc.PersonServiceServicer): + def RetrieveAllPersons(self, request, context): + persons = db.query(Person).all() + logging.info(persons) + + grpc_persons = persons_pb2.ListPerson(persons=[ + persons_pb2.Person(id=p.id, first_name=p.first_name, last_name=p.last_name, company_name=p.company_name) + for p in persons + ]) + + return grpc_persons + +def serve(): + logging.info("Server is starting...") + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + persons_pb2_grpc.add_PersonServiceServicer_to_server( + PersonServicer(), server + ) + server.add_insecure_port("[::]:50051") + server.start() + server.wait_for_termination() + logging.info("Server started") + + +if __name__ == "__main__": + serve() \ No newline at end of file diff --git a/modules/person_rpc/requirements.txt b/modules/person_rpc/requirements.txt new file mode 100644 index 000000000..3393643b6 --- /dev/null +++ b/modules/person_rpc/requirements.txt @@ -0,0 +1,28 @@ +aniso8601==7.0.0 +attrs==19.1.0 +Click==7.0 +Flask==1.1.1 +flask-accepts==0.10.0 +flask-cors==3.0.8 +Flask-RESTful==0.3.7 +flask-restplus==0.12.1 +Flask-Script==2.0.6 +Flask-SQLAlchemy==2.4.0 +GeoAlchemy2==0.8.4 +itsdangerous==1.1.0 +Jinja2==2.11.2 +jsonschema==3.0.2 +MarkupSafe==1.1.1 +marshmallow==3.7.1 +marshmallow-sqlalchemy==0.23.1 +psycopg2-binary==2.8.5 +pyrsistent==0.16.0 +python-dateutil==2.8.1 +pytz==2020.1 +six==1.15.0 +shapely==1.7.0 +SQLAlchemy==1.3.19 +Werkzeug==0.16.1 +flask-restx==0.2.0 +grpcio==1.60.0 +protobuf diff --git a/modules/person_rpc/wsgi.py b/modules/person_rpc/wsgi.py new file mode 100644 index 000000000..fade9cb1f --- /dev/null +++ b/modules/person_rpc/wsgi.py @@ -0,0 +1,7 @@ +import os + +from modules.person_rpc.app.server import create_app + +app = create_app(os.getenv("FLASK_ENV") or "test") +if __name__ == "__main__": + app.run(debug=True) diff --git a/notes.md b/notes.md new file mode 100644 index 000000000..413b6a44b --- /dev/null +++ b/notes.md @@ -0,0 +1,109 @@ +# k3s: Lightweight Kubernetes +```shell +# Install +curl -sfL https://get.k3s.io | sh - + +# Config kubectl +sudo cp /etc/rancher/k3s/k3s.yaml /home/${USER}/.kube/config +sudo chown ${USER} ~/.kube/config +chmod 600 ~/.kube/config + +``` + +# Argocd: Declarative continuous delivery with a fully-loaded UI + +https://argo-cd.readthedocs.io/en/stable/getting_started +## Install Argo CD¶ +```shell +# namespace argocd +kubectl create namespace argocd + +# apply +kubectl apply -n argocd -f https://raw.githubusercontent.com/argoproj/argo-cd/stable/manifests/install.yaml + +``` +## Access The Argo CD API Server + +```shell +# Service Type Load Balancer +kubectl patch svc argocd-server -n argocd -p '{"spec": {"type": "LoadBalancer"}}' + +# Port Forwarding +kubectl port-forward svc/argocd-server -n argocd 8080:443 + +``` + +## Download Argo CD CLI +```shell +Download the latest Argo CD version from https://github.com/argoproj/argo-cd/releases/latest. More detailed installation instructions can be found via the CLI installation documentation. +``` + +## Login Using The CLI +```shell +# The initial password for the admin account is auto-generated and stored as clear text in the field password in a secret named argocd-initial-admin-secret in your Argo CD installation namespace. You can simply retrieve this password using the argocd CLI: +argocd admin initial-password -n argocd + +# Using the username admin and the password from above, login to Argo CD's IP or hostname: +argocd login + +# Change the password using the command: +argocd account update-password + +``` + +# Helm: The package manager for Kubernetes +https://helm.sh + +## Install Helm + +https://helm.sh/docs/intro/install/ + +```shell + +curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 +chmod 700 get_helm.sh +./get_helm.sh + +``` + +## Helm cheatsheet + +https://helm.sh/docs/intro/cheatsheet/ + +# Prometheus: Monitoring system & time series database + +https://prometheus.io/ + + +# Grafana: open source, distributed tracing platform + +https://grafana.com/ + + +# Jaeger: open source, distributed tracing platform + +https://www.jaegertracing.io/ + +# gRPC: A high performance, open source universal RPC framework + +https://grpc.io/ + +## gRPC Python + +https://grpc.io/docs/languages/python/quickstart/ + +``` + +``` + +# Postgres + +```shell + +kubectl apply -f deployment/db-configmap.yaml + +kubectl apply -f deployment/db-secret.yaml + +kubectl apply -f deployment/postgres.yaml + +``` \ No newline at end of file diff --git a/protobufs/REAME.md b/protobufs/REAME.md new file mode 100644 index 000000000..74dc4c6ad --- /dev/null +++ b/protobufs/REAME.md @@ -0,0 +1,82 @@ +# The RPC Server + +## Generate Python code from the protobufs +```shell + +# person_rpc +python3 -m grpc_tools.protoc -I ./protobufs --python_out=./modules/person_rpc/app --grpc_python_out=./modules/person_rpc/app ./protobufs/*.proto + +# connection +python3 -m grpc_tools.protoc -I ./protobufs --python_out=./modules/connection --grpc_python_out=./modules/connection ./protobufs/*.proto + +``` + +## The RPC Client +```shell + +``` + +``` +flask run --host 0.0.0.0 +``` + +```shell + +# location + +curl -X POST \ + http://localhost:5001/api/locations \ + -H 'Content-Type: application/json' \ + -d '{ + "latitude": "-122.290524", + "longitude": "37.553441", + "creation_time": "2020-08-18T10:37:06", + "person_id": 29 +}' + + +curl -X POST \ + http://localhost:5000/api/locations \ + -H 'Content-Type: application/json' \ + -d '{ + "latitude": "-122.290524", + "longitude": "37.553441", + "creation_time": "2020-08-18T10:37:06", + "person_id": 29 +}' + + +# person +curl -X POST \ + http://localhost:5001/api/persons \ + -H 'Content-Type: application/json' \ + -d '{ + "first_name": "John", + "last_name": "Doe", + "company_name": "ABC Corp" + }' + + + curl -X POST \ + http://localhost:5001/api/persons \ + -H 'Content-Type: application/json' \ + -d '{ + "first_name": "John", + "last_name": "Doe", + "company_name": "ABC Corp" + }' + +``` + +```shell + +bin/kafka-topics.sh --create --topic location-topic --bootstrap-server localhost:9092 + +bin/kafka-console-consumer.sh --topic location-topic --from-beginning --bootstrap-server localhost:9092 + + +bin/kafka-topics.sh --create --topic person-topic --bootstrap-server localhost:9092 + +bin/kafka-console-consumer.sh --topic person-topic --from-beginning --bootstrap-server localhost:9092 + +``` \ No newline at end of file diff --git a/protobufs/persons.proto b/protobufs/persons.proto new file mode 100644 index 000000000..705737721 --- /dev/null +++ b/protobufs/persons.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; + +package udaconnect; + +message Person { + int32 id = 1; + string first_name = 2; + string last_name = 3; + string company_name = 4; +} + +message ListPerson { + repeated Person persons = 1; +} + +message RetrieveAllPersonsRequest {} + +service PersonService { + rpc RetrieveAllPersons(RetrieveAllPersonsRequest) returns (ListPerson); +} diff --git a/scripts/run_db_command.sh b/scripts/run_db_command.sh old mode 100644 new mode 100755