Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Nov 19, 2024
0 parents commit 39f81c1
Show file tree
Hide file tree
Showing 29 changed files with 869 additions and 0 deletions.
38 changes: 38 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: CI
on:
pull_request:
branches: ['**']
push:
branches: ['**']
tags: [v*]
jobs:
ci:
# run on external PRs, but not on internal PRs since those will be run by push to branch
if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository
runs-on: ubuntu-20.04
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up JDK 11
uses: actions/setup-java@v1
with:
java-version: 11
- name: Cache sbt
uses: actions/cache@v2
with:
path: |
~/.sbt
~/.ivy2/cache
~/.coursier
key: sbt-cache-${{ runner.os }}-${{ hashFiles('project/build.properties') }}
- name: Compile
run: sbt -v compile
- name: Test
run: sbt -v test
- name: Cleanup
run: |
rm -rf "$HOME/.ivy2/local" || true
find $HOME/.ivy2/cache -name "ivydata-*.properties" -delete || true
find $HOME/.ivy2/cache -name "*-LM-SNAPSHOT*" -delete || true
find $HOME/.cache/coursier/v1 -name "ivydata-*.properties" -delete || true
find $HOME/.sbt -name "*.lock" -delete || true
28 changes: 28 additions & 0 deletions .github/workflows/scala-steward.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Scala Steward

# This workflow will launch at 00:00 every day
on:
schedule:
- cron: '0 0 * * *'
workflow_dispatch:

jobs:
scala-steward:
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Set up JDK 11
uses: actions/setup-java@v3
with:
distribution: 'temurin'
java-version: 11
cache: 'sbt'
- name: Launch Scala Steward
uses: scala-steward-org/scala-steward-action@v2
with:
author-name: scala-steward
author-email: scala-steward
github-token: ${{ secrets.REPO_GITHUB_TOKEN }}
repo-config: .scala-steward.conf
ignore-opts-files: false
25 changes: 25 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
*.class
*.log

.cache
.env
.envrc
.history
.sdkmanrc
.lib/
dist/*
target/
lib_managed/
local.conf
src_managed/
project/boot/
project/plugins/project/

.idea*

# Metals
.metals/
.bsp/
.bloop/
metals.sbt
.vscode
49 changes: 49 additions & 0 deletions .mergify.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
pull_request_rules:
- name: delete head branch after merge
conditions: []
actions:
delete_head_branch: {}
- name: automatic merge for softwaremill-ci pull requests affecting build.sbt
conditions:
- author=softwaremill-ci
- check-success=ci
- "#files=1"
- files=build.sbt
actions:
merge:
method: merge
- name: automatic merge for softwaremill-ci pull requests affecting project plugins.sbt
conditions:
- author=softwaremill-ci
- check-success=ci
- "#files=1"
- files=project/plugins.sbt
actions:
merge:
method: merge
- name: semi-automatic merge for softwaremill-ci pull requests
conditions:
- author=softwaremill-ci
- check-success=ci
- "#approved-reviews-by>=1"
actions:
merge:
method: merge
- name: automatic merge for softwaremill-ci pull requests affecting project build.properties
conditions:
- author=softwaremill-ci
- check-success=ci
- "#files=1"
- files=project/build.properties
actions:
merge:
method: merge
- name: automatic merge for softwaremill-ci pull requests affecting .scalafmt.conf
conditions:
- author=softwaremill-ci
- check-success=ci
- "#files=1"
- files=.scalafmt.conf
actions:
merge:
method: merge
3 changes: 3 additions & 0 deletions .scala-steward.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
updates.ignore = [
{groupId = "org.scala-lang", artifactId = "scala-compiler", version = "2.13."},
]
4 changes: 4 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
version = 3.8.0
maxColumn = 80
rewrite.rules = [RedundantBraces, RedundantParens, SortImports]
runner.dialect = scala3
28 changes: 28 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import com.softwaremill.SbtSoftwareMillCommon.commonSmlBuildSettings

lazy val commonSettings = commonSmlBuildSettings ++ Seq(
organization := "com.softwaremill.kafka",
scalaVersion := "3.5.2"
)

val scalaTest = "org.scalatest" %% "scalatest" % "3.2.18" % Test

lazy val rootProject = (project in file("."))
.settings(commonSettings: _*)
.settings(publishArtifact := false, name := "kafka-ox-pres")
.aggregate(core)

lazy val core: Project = (project in file("core"))
.settings(commonSettings: _*)
.settings(
name := "core",
libraryDependencies ++= Seq(
"com.softwaremill.sttp.tapir" %% "tapir-netty-server-sync" % "1.11.9",
"com.softwaremill.ox" %% "kafka" % "0.5.3",
"ch.qos.logback" % "logback-classic" % "1.5.8",
"org.apache.pekko" %% "pekko-connectors-kafka" % "1.1.0",
"org.apache.pekko" %% "pekko-stream" % "1.1.2",
"com.softwaremill.sttp.client4" %% "core" % "4.0.0-M19",
scalaTest
)
)
12 changes: 12 additions & 0 deletions core/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>

<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package pres

/*
_____ _ _ _ _ _ _
| _ |_| |___ _____ | | | |___ ___ ___| |_|_|
| | . | .'| | | | | | .'| _|_ -| '_| |
|__|__|___|__,|_|_|_| |_____|__,|_| |___|_,_|_|
██╗ ██╗ █████╗ ███████╗██╗ ██╗ █████╗
██║ ██╔╝██╔══██╗██╔════╝██║ ██╔╝██╔══██╗
█████╔╝ ███████║█████╗ █████╔╝ ███████║
██╔═██╗ ██╔══██║██╔══╝ ██╔═██╗ ██╔══██║
██║ ██╗██║ ██║██║ ██║ ██╗██║ ██║
╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝
*/
class S010_Fast_blocking_Kafka_streaming_on_the_JVM
27 changes: 27 additions & 0 deletions core/src/main/scala/pres/S020_Virtual_threads.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package pres

import ox.discard
import java.util.concurrent.ConcurrentHashMap

@main def s020_Virtual_threads() =
timed("loom") {
val threads = new Array[Thread](10_000_000)
val results = ConcurrentHashMap.newKeySet[Int]()

var i = 0
while i < threads.length do
threads(i) = Thread
.ofVirtual()
.start(
new Runnable:
def run() = results.add(0).discard
);
i += 1
end while

i = 0
while i < threads.length do
threads(i).join()
i += 1
end while
}
12 changes: 12 additions & 0 deletions core/src/main/scala/pres/S030_Why_Loom.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package pres

class S030_Why_Loom {
/*
Loom principles:
* direct syntax, blocking code
* non-viral
* meaningful stack traces
*/
}
19 changes: 19 additions & 0 deletions core/src/main/scala/pres/S040_Why_reactive_streams.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package pres

class S040_Why_reactive_streams {
/*
[Reactive streams](https://www.reactive-streams.org) are useful!
What are they?
* "govern the exchange of stream data across an asynchronous boundary"
* within bounded memory
When to use RS?
* process streaming data
* manage concurrency
* interface with I/O operations
* safely handle errors
*/
}
81 changes: 81 additions & 0 deletions core/src/main/scala/pres/S050_Pekko_streams_example.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package pres

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.kafka.CommitterSettings
import org.apache.pekko.kafka.ConsumerSettings
import org.apache.pekko.kafka.ProducerMessage
import org.apache.pekko.kafka.ProducerSettings
import org.apache.pekko.kafka.Subscriptions
import org.apache.pekko.kafka.scaladsl.Committer
import org.apache.pekko.kafka.scaladsl.Consumer
import org.apache.pekko.kafka.scaladsl.Consumer.DrainingControl
import org.apache.pekko.kafka.scaladsl.Producer
import org.slf4j.LoggerFactory
import ox.discard
import ox.get
import sttp.client4.*
import sttp.client4.httpclient.HttpClientFutureBackend

import scala.concurrent.Await
import scala.concurrent.duration.Duration

@main def s050_Pekko_streams_example(): Unit =
val sourceTopic = "t1"
val destTopic = "t2"
val group = "g1"

val logger = LoggerFactory.getLogger("pekko-streams-example")

given system: ActorSystem = ActorSystem("transfer")
try
import system.dispatcher

val producerSettings =
ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServer)
val consumerSettings =
ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServer)
.withGroupId(group)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

val backend = HttpClientFutureBackend()

val stream = Consumer
.committableSource(consumerSettings, Subscriptions.topics(sourceTopic))
.mapAsync(5) { commitableMsg =>
val msg = commitableMsg.record.value()
val work1 = basicRequest
.response(asString.getRight)
.post(uri"http://localhost:8080/work1")
.body(msg)
.send(backend)
val work2 = basicRequest
.response(asString.getRight)
.post(uri"http://localhost:8080/work2")
.body(msg)
.send(backend)

for {
response1 <- work1
response2 <- work2
result = response1.body + ": " + response2.body
_ = logger.info(s"Result for $msg: $result")
} yield ProducerMessage.single(
new ProducerRecord[String, String](destTopic, null, result),
commitableMsg.committableOffset
)
}
.via(Producer.flexiFlow(producerSettings))
.map(_.passThrough)
.toMat(Committer.sink(CommitterSettings(system)))(DrainingControl.apply)
.run()
.streamCompletion

stream.get().discard
finally system.terminate().get().discard
end s050_Pekko_streams_example
29 changes: 29 additions & 0 deletions core/src/main/scala/pres/S060_Structured_concurrency.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package pres

import ox.*

import scala.concurrent.duration.DurationInt

@main def s060_Structured_concurrency_1(): Unit =
supervised:
val f1 = fork:
sleep(2.seconds)
1

val f2 = fork:
sleep(1.second)
2

println(f1.join() + f2.join())

@main def s060_Structured_concurrency_2(): Unit =
supervised:
val f1 = fork:
sleep(2.seconds)
1

val f2 = fork[Int]:
sleep(1.second)
throw new RuntimeException("boom")

println(f1.join() + f2.join())
Loading

0 comments on commit 39f81c1

Please sign in to comment.