Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,22 @@ RUN apt-get update \
&& apt-get -y upgrade

COPY --from=builder /app /app
COPY --from=builder /REVISION /REVISION
# COPY --from=builder /REVISION /REVISION

WORKDIR /app

RUN useradd -u 1000 maxwell -d /app
RUN chown 1000:1000 /app
RUN echo "$MAXWELL_VERSION" > /REVISION
#USER 1000

USER 1000

RUN apt-get update && apt-get install -y --no-install-recommends wget unzip procps python3-pip htop
# RUN pipx install magic-wormhole

ARG ASYNC_PROFILER_VERSION=2.9
RUN wget https://github.com/jvm-profiling-tools/async-profiler/releases/download/v${ASYNC_PROFILER_VERSION}/async-profiler-${ASYNC_PROFILER_VERSION}-linux-x64.tar.gz -O /tmp/async-profiler.tar.gz \
&& tar -xzf /tmp/async-profiler.tar.gz -C /opt \
&& rm /tmp/async-profiler.tar.gz
ENV ASYNC_PROFILER_HOME=/opt/async-profiler-${ASYNC_PROFILER_VERSION}-linux-x64
ENV PATH="$PATH:${ASYNC_PROFILER_HOME}"

CMD [ "/bin/bash", "-c", "bin/maxwell-docker" ]
2 changes: 1 addition & 1 deletion bin/maxwell
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fi

CLASSPATH="$CLASSPATH:$lib_dir/*"

KAFKA_VERSION="2.7.0"
KAFKA_VERSION="3.7.2"

function use_kafka() {
wanted="$1"
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/zendesk/maxwell/MaxwellConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ public class MaxwellConfig extends AbstractConfig {
public String bigQueryTable;


public int bigQueryThreads;


/**
* Used in all producers deriving from {@link com.zendesk.maxwell.producer.AbstractAsyncProducer}.<br>
* In milliseconds, time a message can spend in the {@link com.zendesk.maxwell.producer.InflightMessageList}
Expand Down Expand Up @@ -910,6 +913,8 @@ protected MaxwellOptionParser buildOptionParser() {
.withRequiredArg();
parser.accepts( "bigquery_table", "provide a google cloud platform table id associated with the bigquery table" )
.withRequiredArg();
parser.accepts( "bigquery_threads", "number of threads to start to write data to bigquery" )
.withRequiredArg();

parser.section( "pubsub" );
parser.accepts( "pubsub_project_id", "provide a google cloud platform project id associated with the pubsub topic" )
Expand Down Expand Up @@ -1081,6 +1086,7 @@ private void setup(OptionSet options, Properties properties) {
this.bigQueryProjectId = fetchStringOption("bigquery_project_id", options, properties, null);
this.bigQueryDataset = fetchStringOption("bigquery_dataset", options, properties, null);
this.bigQueryTable = fetchStringOption("bigquery_table", options, properties, null);
this.bigQueryThreads = fetchIntegerOption("bigquery_threads", options, properties, 2);

this.pubsubProjectId = fetchStringOption("pubsub_project_id", options, properties, null);
this.pubsubTopic = fetchStringOption("pubsub_topic", options, properties, "maxwell");
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/com/zendesk/maxwell/MaxwellContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ public Thread terminate() {
return terminate(null);
}

public boolean isTerminated() {
return this.terminationThread != null;
}

/**
* Begin the Maxwell shutdown process
* @param error An exception that caused the shutdown, or null
Expand Down Expand Up @@ -553,7 +557,7 @@ public AbstractProducer getProducer() throws IOException {
this.producer = new MaxwellRedisProducer(this);
break;
case "bigquery":
this.producer = new MaxwellBigQueryProducer(this, this.config.bigQueryProjectId, this.config.bigQueryDataset, this.config.bigQueryTable);
this.producer = new MaxwellBigQueryProducer(this, this.config.bigQueryProjectId, this.config.bigQueryDataset, this.config.bigQueryTable, this.config.bigQueryThreads);
break;
case "none":
this.producer = new NoneProducer(this);
Expand Down
Loading