diff --git a/.gitignore b/.gitignore index e10e30a..10bee76 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,5 @@ script/ .idea/ fluentd-0.12 + +integration/log diff --git a/README.md b/README.md index 412bb4c..c2d4ac8 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,6 @@ Because embedded gem dependency sometimes restricts ruby environment. | private_key_path | string | yes (private_key) | no | nil | GCP Private Key file path | | private_key_passphrase | string | yes (private_key) | no | nil | GCP Private Key Passphrase | | json_key | string | yes (json_key) | no | nil | GCP JSON Key file path or JSON Key string | -| location | string | no | no | nil | BigQuery Data Location. The geographic location of the job. Required except for US and EU. | | project | string | yes | yes | nil | | | dataset | string | yes | yes | nil | | | table | string | yes (either `tables`) | yes | nil | | diff --git a/lib/fluent/plugin/bigquery/errors.rb b/lib/fluent/plugin/bigquery/errors.rb index 66365a1..14acfa1 100644 --- a/lib/fluent/plugin/bigquery/errors.rb +++ b/lib/fluent/plugin/bigquery/errors.rb @@ -5,6 +5,7 @@ class Error < StandardError RETRYABLE_ERROR_REASON = %w(backendError internalError rateLimitExceeded tableUnavailable).freeze RETRYABLE_INSERT_ERRORS_REASON = %w(timeout backendError internalError rateLimitExceeded).freeze RETRYABLE_STATUS_CODE = [500, 502, 503, 504] + REGION_NOT_WRITABLE_MESSAGE = -"is not writable in the region" class << self # @param e [Google::Apis::Error] @@ -19,6 +20,10 @@ def wrap(e, message = nil) # @param e [Google::Apis::Error] def retryable_error?(e) + retryable_server_error?(e) || retryable_region_not_writable?(e) + end + + def retryable_server_error?(e) e.is_a?(Google::Apis::ServerError) && RETRYABLE_STATUS_CODE.include?(e.status_code) end @@ -30,6 +35,10 @@ def retryable_insert_errors_reason?(reason) RETRYABLE_INSERT_ERRORS_REASON.include?(reason) end + def retryable_region_not_writable?(e) + e.is_a?(Google::Apis::ClientError) && e.status_code == 400 && e.message.include?(REGION_NOT_WRITABLE_MESSAGE) + end + # Guard for instantiation private :new def inherited(subclass) diff --git a/lib/fluent/plugin/bigquery/writer.rb b/lib/fluent/plugin/bigquery/writer.rb index ee6c79c..0fe89b4 100644 --- a/lib/fluent/plugin/bigquery/writer.rb +++ b/lib/fluent/plugin/bigquery/writer.rb @@ -101,6 +101,7 @@ def insert_rows(project, dataset, table_id, rows, schema, template_suffix: nil) end end rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e + log.debug "insert error: #{e.message}", status_code: e.respond_to?(:status_code) ? e.status_code : nil, reason: e.respond_to?(:reason) ? e.reason : nil error_data = { project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message } wrapped = Fluent::BigQuery::Error.wrap(e) if wrapped.retryable? @@ -112,7 +113,7 @@ def insert_rows(project, dataset, table_id, rows, schema, template_suffix: nil) raise wrapped end - JobReference = Struct.new(:chunk_id, :chunk_id_hex, :project_id, :dataset_id, :table_id, :job_id) do + JobReference = Struct.new(:chunk_id, :chunk_id_hex, :project_id, :dataset_id, :table_id, :job_id, :location) do def as_hash(*keys) if keys.empty? to_h @@ -161,7 +162,7 @@ def create_load_job(chunk_id, chunk_id_hex, project, dataset, table_id, upload_s upload_source: upload_source, content_type: "application/octet-stream", ) - JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, res.job_reference.job_id) + JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, res.job_reference.job_id, res.job_reference.location) rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message @@ -175,7 +176,7 @@ def create_load_job(chunk_id, chunk_id_hex, project, dataset, table_id, upload_s def fetch_load_job(job_reference) project = job_reference.project_id job_id = job_reference.job_id - location = @options[:location] + location = job_reference.location res = client.get_job(project, job_id, location: location) log.debug "load job fetched", id: job_id, state: res.status.state, **job_reference.as_hash(:project_id, :dataset_id, :table_id) diff --git a/lib/fluent/plugin/out_bigquery_base.rb b/lib/fluent/plugin/out_bigquery_base.rb index d83aceb..74095b8 100644 --- a/lib/fluent/plugin/out_bigquery_base.rb +++ b/lib/fluent/plugin/out_bigquery_base.rb @@ -29,9 +29,6 @@ class BigQueryBaseOutput < Output config_param :private_key_path, :string, default: nil config_param :private_key_passphrase, :string, default: 'notasecret', secret: true config_param :json_key, default: nil, secret: true - # The geographic location of the job. Required except for US and EU. - # https://github.com/googleapis/google-api-ruby-client/blob/master/generated/google/apis/bigquery_v2/service.rb#L350 - config_param :location, :string, default: nil # see as simple reference # https://github.com/abronte/BigQuery/blob/master/lib/bigquery.rb @@ -135,7 +132,6 @@ def writer private_key_path: @private_key_path, private_key_passphrase: @private_key_passphrase, email: @email, json_key: @json_key, - location: @location, source_format: @source_format, skip_invalid_rows: @skip_invalid_rows, ignore_unknown_values: @ignore_unknown_values, diff --git a/test/plugin/test_out_bigquery_load.rb b/test/plugin/test_out_bigquery_load.rb index 63be574..1c5d225 100644 --- a/test/plugin/test_out_bigquery_load.rb +++ b/test/plugin/test_out_bigquery_load.rb @@ -65,7 +65,10 @@ def test_write } } }, upload_source: duck_type(:write, :sync, :rewind), content_type: "application/octet-stream") do - stub!.job_reference.stub!.job_id { "dummy_job_id" } + stub!.job_reference.stub! do |s| + s.job_id { "dummy_job_id" } + s.location { "us" } + end end end @@ -118,7 +121,10 @@ def test_write_with_prevent_duplicate_load }, job_reference: {project_id: 'yourproject_id', job_id: satisfy { |x| x =~ /fluentd_job_.*/}} , }, upload_source: duck_type(:write, :sync, :rewind), content_type: "application/octet-stream") do - stub!.job_reference.stub!.job_id { "dummy_job_id" } + stub!.job_reference.stub! do |s| + s.job_id { "dummy_job_id" } + s.location { "us" } + end end end @@ -155,10 +161,13 @@ def test_write_with_retryable_error } } }, upload_source: duck_type(:write, :sync, :rewind), content_type: "application/octet-stream") do - stub!.job_reference.stub!.job_id { "dummy_job_id" } + stub!.job_reference.stub! do |s| + s.job_id { "dummy_job_id" } + s.location { "us" } + end end - mock(writer.client).get_job('yourproject_id', 'dummy_job_id', :location=>nil) do + mock(writer.client).get_job('yourproject_id', 'dummy_job_id', location: "us") do stub! do |s| s.id { 'dummy_job_id' } s.configuration.stub! do |_s| @@ -238,10 +247,13 @@ def test_write_with_not_retryable_error } } }, upload_source: duck_type(:write, :sync, :rewind), content_type: "application/octet-stream") do - stub!.job_reference.stub!.job_id { "dummy_job_id" } + stub!.job_reference.stub! do |s| + s.job_id { "dummy_job_id" } + s.location { "us" } + end end - mock(writer.client).get_job('yourproject_id', 'dummy_job_id', :location=>nil) do + mock(writer.client).get_job('yourproject_id', 'dummy_job_id', location: "us") do stub! do |s| s.id { 'dummy_job_id' } s.configuration.stub! do |_s| @@ -318,7 +330,10 @@ def test_write_with_auto_create_table } } }, upload_source: duck_type(:write, :sync, :rewind), content_type: "application/octet-stream") do - stub!.job_reference.stub!.job_id { "dummy_job_id" } + stub!.job_reference.stub! do |s| + s.job_id { "dummy_job_id" } + s.location { "us" } + end end end