Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
229dc7f
Initial plan
Copilot Oct 5, 2025
031c80c
WI10: Initial setup - create work item and verify build
Copilot Oct 5, 2025
8f24b1d
WI10: Fix Kafka default port in FlinkJobRunner (9093 -> 9092)
Copilot Oct 5, 2025
1df81f0
WI10: Add KAFKA_BOOTSTRAP env and document Flink container issue
Copilot Oct 5, 2025
aa7d19e
Re-enable AppHost disposal and add docker ps debug logging
Copilot Oct 5, 2025
7c900c2
Add comprehensive Flink container debugging logs
Copilot Oct 5, 2025
bbe1ebc
Add job status checks to identify container teardown issue
Copilot Oct 5, 2025
c558944
Fix Aspire service discovery endpoint names for Flink connectivity
Copilot Oct 5, 2025
8d4d07a
Remove LOCAL mode execution, reject requests when cluster unhealthy
Copilot Oct 5, 2025
7f33ae7
Add Kafka connectivity diagnostics and fix comment about Kafka ports
Copilot Oct 5, 2025
a5239df
Remove unused LOCAL execution methods and fix code analysis warnings
Copilot Oct 5, 2025
0264c67
Add persistent container lifetime for debugging container disappearance
Copilot Oct 6, 2025
136617f
Fix persistent container lifetime - containers now stay alive after t…
Copilot Oct 6, 2025
69cb481
Update WI10 with root cause analysis - Aspire proxy port issue
Copilot Oct 6, 2025
f3484ad
Complete WI10 with comprehensive root cause analysis and solution
Copilot Oct 6, 2025
bc8b3b7
Fix Kafka port discovery - remove Aspire connection string fallback
Copilot Oct 6, 2025
8a70468
Improve Kafka port discovery with extended retries and debug logging
Copilot Oct 6, 2025
b0d6da8
Add container creation wait before port discovery
Copilot Oct 6, 2025
a710c21
Revert persistent container lifetime - let Aspire manage lifecycle
Copilot Oct 6, 2025
664116c
Increase container creation wait to 30 seconds
Copilot Oct 6, 2025
b56cf53
Add detailed container debug logging to diagnose Aspire behavior
Copilot Oct 6, 2025
e1bcb5d
Fix SQL Gateway startup args to use correct Flink command
Copilot Oct 6, 2025
de0db58
Init
devstress Oct 6, 2025
4de1ab9
init
devstress Oct 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions FlinkDotNet/Flink.JobGateway/Flink.JobGateway.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -222,19 +222,19 @@

<!-- Determine which Maven command to use (cross-platform) -->
<PropertyGroup>
<!-- Clean up Maven path output (remove trailing newlines/whitespace) -->
<MavenFullPath Condition="'$(MavenFullPathRaw)' != ''">$([System.String]::Copy('$(MavenFullPathRaw)').Trim().Split(&#xD;&#xA;, System.StringSplitOptions.RemoveEmptyEntries)[0].Trim())</MavenFullPath>

<!-- Priority 1: Use full path to system Maven if found in PATH (e.g., from GitHub Actions) -->
<MavenCommand Condition="'$(MavenFoundExitCode)' == '0' AND '$(MavenFullPath)' != ''">$(MavenFullPath)</MavenCommand>
<!-- Priority 2: Use relative mvn if found in PATH but full path not captured -->
<MavenCommand Condition="'$(MavenCommand)' == '' AND '$(MavenFoundExitCode)' == '0'">mvn</MavenCommand>
<!-- Windows 'where' command returns multiple matches separated by newlines -->
<!-- Just skip parsing 'where' output and use simple approach: mvn.cmd on Windows, mvn on Linux -->
<!-- Priority 1: Use mvn.cmd if found in PATH on Windows -->
<MavenCommand Condition="'$(MavenFoundExitCode)' == '0' AND '$(IsWindows)' == 'true'">mvn.cmd</MavenCommand>
<!-- Priority 2: Use mvn if found in PATH on Linux/macOS -->
<MavenCommand Condition="'$(MavenFoundExitCode)' == '0' AND ('$(IsLinux)' == 'true' OR '$(IsMacOS)' == 'true')">mvn</MavenCommand>
<!-- Priority 3: Use locally installed Maven if it exists (Windows) -->
<MavenCommand Condition="'$(MavenCommand)' == '' AND '$(IsWindows)' == 'true' AND Exists('$(MavenInstallDir)\bin\mvn.cmd')">$(MavenInstallDir)\bin\mvn.cmd</MavenCommand>
<!-- Priority 4: Use locally installed Maven if it exists (Linux/macOS) -->
<MavenCommand Condition="'$(MavenCommand)' == '' AND ('$(IsLinux)' == 'true' OR '$(IsMacOS)' == 'true') AND Exists('$(MavenInstallDir)/bin/mvn')">$(MavenInstallDir)/bin/mvn</MavenCommand>
<!-- Fallback: Use mvn from PATH (should be available if checks passed) -->
<MavenCommand Condition="'$(MavenCommand)' == ''">mvn</MavenCommand>
<!-- Fallback: Use mvn.cmd on Windows, mvn on Linux/macOS -->
<MavenCommand Condition="'$(MavenCommand)' == '' AND '$(IsWindows)' == 'true'">mvn.cmd</MavenCommand>
<MavenCommand Condition="'$(MavenCommand)' == '' AND ('$(IsLinux)' == 'true' OR '$(IsMacOS)' == 'true')">mvn</MavenCommand>
</PropertyGroup>

<!-- Ensure JAVA_HOME is set and valid for Maven (cross-platform) -->
Expand Down
227 changes: 47 additions & 180 deletions FlinkDotNet/Flink.JobGateway/Services/FlinkJobManager.cs

Large diffs are not rendered by default.

Binary file modified FlinkDotNet/Flink.JobGateway/flink-ir-runner-java17.jar
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,18 @@ public static void main(String[] args) throws Exception {
return; // No further DataStream processing for pure SQL jobs
} else if (ir.source instanceof KafkaSourceDefinition) {
KafkaSourceDefinition k = (KafkaSourceDefinition) ir.source;
String bootstrap = orElse(k.bootstrapServers, System.getenv("KAFKA_BOOTSTRAP"), "kafka:9093");
String bootstrap = orElse(k.bootstrapServers, System.getenv("KAFKA_BOOTSTRAP"), "kafka:9092");
String groupId = orElse(k.groupId, "flinkdotnet-ir-runner");

System.out.println("════════════════════════════════════════════════════════════");
System.out.println("============================================================");
System.out.println("[KAFKA SOURCE] Configuration:");
System.out.println(" - bootstrapServers field from JSON: " + k.bootstrapServers);
System.out.println(" - KAFKA_BOOTSTRAP environment: " + System.getenv("KAFKA_BOOTSTRAP"));
System.out.println(" - FINAL bootstrap.servers: " + bootstrap);
System.out.println(" - Topic: " + k.topic);
System.out.println(" - GroupId: " + groupId);
System.out.println(" - Starting offsets: " + orElse(k.startingOffsets, "latest"));
System.out.println("════════════════════════════════════════════════════════════");
System.out.println("============================================================");

Properties props = new Properties();
props.put("bootstrap.servers", bootstrap);
Expand All @@ -127,13 +127,13 @@ public static void main(String[] args) throws Exception {
if (op instanceof MapOperationDefinition) {
MapOperationDefinition m = (MapOperationDefinition) op;
String expr = orElse(m.expression, m.function, "identity");
System.out.println("════════════════════════════════════════════════════════════");
System.out.println("============================================================");
System.out.println("[MAP OPERATION] Processing:");
System.out.println(" - expression field from JSON: " + m.expression);
System.out.println(" - function field from JSON: " + m.function);
System.out.println(" - Resolved expression: " + expr);
System.out.println(" - Normalized (lowercase): " + expr.toLowerCase(Locale.ROOT));
System.out.println("════════════════════════════════════════════════════════════");
System.out.println("============================================================");

switch (expr.toLowerCase(Locale.ROOT)) {
case "upper":
Expand Down Expand Up @@ -238,16 +238,16 @@ public void processElement(String value, Context ctx, Collector<String> out) {
KafkaSinkDefinition s = (KafkaSinkDefinition) ir.sink;
String bootstrap = orElse(s.bootstrapServers,
(ir.source instanceof KafkaSourceDefinition) ? ((KafkaSourceDefinition) ir.source).bootstrapServers : null,
System.getenv("KAFKA_BOOTSTRAP"), "kafka:9093");
System.getenv("KAFKA_BOOTSTRAP"), "kafka:9092");

System.out.println("════════════════════════════════════════════════════════════");
System.out.println("============================================================");
System.out.println("[KAFKA SINK] Configuration:");
System.out.println(" - bootstrapServers field from JSON: " + s.bootstrapServers);
System.out.println(" - Source bootstrapServers: " + ((ir.source instanceof KafkaSourceDefinition) ? ((KafkaSourceDefinition) ir.source).bootstrapServers : "N/A"));
System.out.println(" - KAFKA_BOOTSTRAP environment: " + System.getenv("KAFKA_BOOTSTRAP"));
System.out.println(" - FINAL bootstrap.servers: " + bootstrap);
System.out.println(" - Topic: " + s.topic);
System.out.println("════════════════════════════════════════════════════════════");
System.out.println("============================================================");

Properties props = new Properties();
props.put("bootstrap.servers", bootstrap);
Expand Down Expand Up @@ -442,13 +442,13 @@ public KafkaStringSource(String topic, Properties props) {

@Override
public void run(org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext<String> ctx) throws Exception {
System.out.println("════════════════════════════════════════════════════════════");
System.out.println("============================================================");
System.out.println("[KAFKA SOURCE] Starting consumer...");
System.out.println(" - Topic: " + topic);
System.out.println(" - Bootstrap servers: " + props.getProperty("bootstrap.servers"));
System.out.println(" - Group ID: " + props.getProperty("group.id"));
System.out.println(" - Auto offset reset: " + props.getProperty("auto.offset.reset"));
System.out.println("════════════════════════════════════════════════════════════");
System.out.println("============================================================");

try (org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props, new org.apache.kafka.common.serialization.StringDeserializer(), new org.apache.kafka.common.serialization.StringDeserializer())) {
System.out.println("[KAFKA SOURCE] ✓ Consumer created, subscribing to topic: " + topic);
Expand Down Expand Up @@ -506,11 +506,11 @@ public KafkaStringSink(String topic, Properties props) {
@Override
public void invoke(String value, org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction.Context context) {
if (producer == null) {
System.out.println("════════════════════════════════════════════════════════════");
System.out.println("============================================================");
System.out.println("[KAFKA SINK] Initializing producer...");
System.out.println(" - Topic: " + topic);
System.out.println(" - Bootstrap servers: " + props.getProperty("bootstrap.servers"));
System.out.println("════════════════════════════════════════════════════════════");
System.out.println("============================================================");

try {
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props, new org.apache.kafka.common.serialization.StringSerializer(), new org.apache.kafka.common.serialization.StringSerializer());
Expand Down
20 changes: 13 additions & 7 deletions LocalTesting/LocalTesting.FlinkSqlAppHost/Ports.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@ public static class Ports
public const int JobManagerRpcPort = 8081; // Container REST/UI port
public const int SqlGatewayHostPort = 8083; // SQL Gateway REST API port
public const int GatewayHostPort = 8080; // Gateway HTTP port
public const int KafkaPort = 9093; // Kafka external listener for host (Aspire will map to dynamic port)

// Kafka connection string for containers within Docker network
// Used by Flink jobs running inside containers to reach Kafka
// CRITICAL: Kafka dual listener configuration:
// - PLAINTEXT (port 9092): Internal access for containers (kafka:9092)
// - PLAINTEXT_HOST (port 9093): External access from host (localhost:9093 -> dynamic port)
public const string KafkaContainerBootstrap = "kafka:9092";
// Kafka FIXED port configuration (no dynamic allocation)
// CRITICAL: Kafka dual listener setup with FIXED ports:
// - PLAINTEXT (port 9092): Internal container-to-container communication
// * Used by Flink TaskManager to connect: kafka:9092
// * Advertised listener: kafka:9092 (keeps containers on container network)
// - PLAINTEXT_HOST (port 9093): External host machine access
// * Used by tests and external clients: localhost:9093
// * Advertised listener: localhost:9093 (accessible from host)
// This ensures TaskManager always connects through kafka:9092 without dynamic port issues
public const int KafkaInternalPort = 9092; // Container network port
public const int KafkaExternalPort = 9093; // Host machine port
public const string KafkaContainerBootstrap = "kafka:9092"; // For Flink containers
public const string KafkaHostBootstrap = "localhost:9093"; // For tests/external access
}
Loading
Loading