Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,22 @@ COPY --from=builder /app /app

WORKDIR /app

RUN useradd -u 1000 maxwell -d /app
RUN chown 1000:1000 /app && echo "$MAXWELL_VERSION" > /REVISION
#RUN useradd -u 1000 maxwell -d /app
#RUN chown 1000:1000 /app && echo "$MAXWELL_VERSION" > /REVISION

RUN echo "$MAXWELL_VERSION" > /REVISION
#USER 1000


RUN apt-get update && apt-get install -y --no-install-recommends wget unzip procps python3-pip htop
RUN pip install magic-wormhole

ARG ASYNC_PROFILER_VERSION=2.9
RUN wget https://github.com/jvm-profiling-tools/async-profiler/releases/download/v${ASYNC_PROFILER_VERSION}/async-profiler-${ASYNC_PROFILER_VERSION}-linux-x64.tar.gz -O /tmp/async-profiler.tar.gz \
&& tar -xzf /tmp/async-profiler.tar.gz -C /opt \
&& rm /tmp/async-profiler.tar.gz
ENV ASYNC_PROFILER_HOME=/opt/async-profiler-${ASYNC_PROFILER_VERSION}-linux-x64
ENV PATH="$PATH:${ASYNC_PROFILER_HOME}"

USER 1000

CMD [ "/bin/bash", "-c", "bin/maxwell-docker" ]
8 changes: 8 additions & 0 deletions src/main/java/com/zendesk/maxwell/MaxwellConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ public class MaxwellConfig extends AbstractConfig {
*/
public String bigQueryTable;

/**
* {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} threads
*/
public int bigQueryThreads;


/**
* Used in all producers deriving from {@link com.zendesk.maxwell.producer.AbstractAsyncProducer}.<br>
Expand Down Expand Up @@ -831,6 +836,8 @@ protected MaxwellOptionParser buildOptionParser() {
.withRequiredArg();
parser.accepts( "bigquery_table", "provide a google cloud platform table id associated with the bigquery table" )
.withRequiredArg();
parser.accepts( "bigquery_threads", "number of threads to start to write data to bigquery" )
.withRequiredArg();

parser.section( "pubsub" );
parser.accepts( "pubsub_project_id", "provide a google cloud platform project id associated with the pubsub topic" )
Expand Down Expand Up @@ -1040,6 +1047,7 @@ private void setup(OptionSet options, Properties properties) {
this.bigQueryProjectId = fetchStringOption("bigquery_project_id", options, properties, null);
this.bigQueryDataset = fetchStringOption("bigquery_dataset", options, properties, null);
this.bigQueryTable = fetchStringOption("bigquery_table", options, properties, null);
this.bigQueryThreads = fetchIntegerOption("bigquery_threads", options, properties, 2);

this.pubsubProjectId = fetchStringOption("pubsub_project_id", options, properties, null);
this.pubsubTopic = fetchStringOption("pubsub_topic", options, properties, "maxwell");
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/com/zendesk/maxwell/MaxwellContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ public Thread terminate() {
return terminate(null);
}

public boolean isTerminated() {
return this.terminationThread != null;
}

/**
* Begin the Maxwell shutdown process
* @param error An exception that caused the shutdown, or null
Expand Down Expand Up @@ -551,7 +555,7 @@ public AbstractProducer getProducer() throws IOException {
this.producer = new MaxwellRedisProducer(this);
break;
case "bigquery":
this.producer = new MaxwellBigQueryProducer(this, this.config.bigQueryProjectId, this.config.bigQueryDataset, this.config.bigQueryTable);
this.producer = new MaxwellBigQueryProducer(this, this.config.bigQueryProjectId, this.config.bigQueryDataset, this.config.bigQueryTable, this.config.bigQueryThreads);
break;
case "none":
this.producer = new NoneProducer(this);
Expand Down
Loading