Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: get location automatically and make is not writable the region retryable #217

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ script/
.idea/

fluentd-0.12

integration/log
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ Because embedded gem dependency sometimes restricts ruby environment.
| private_key_path | string | yes (private_key) | no | nil | GCP Private Key file path |
| private_key_passphrase | string | yes (private_key) | no | nil | GCP Private Key Passphrase |
| json_key | string | yes (json_key) | no | nil | GCP JSON Key file path or JSON Key string |
| location | string | no | no | nil | BigQuery Data Location. The geographic location of the job. Required except for US and EU. |
| project | string | yes | yes | nil | |
| dataset | string | yes | yes | nil | |
| table | string | yes (either `tables`) | yes | nil | |
Expand Down
9 changes: 9 additions & 0 deletions lib/fluent/plugin/bigquery/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ class Error < StandardError
RETRYABLE_ERROR_REASON = %w(backendError internalError rateLimitExceeded tableUnavailable).freeze
RETRYABLE_INSERT_ERRORS_REASON = %w(timeout backendError internalError rateLimitExceeded).freeze
RETRYABLE_STATUS_CODE = [500, 502, 503, 504]
REGION_NOT_WRITABLE_MESSAGE = -"is not writable in the region"

class << self
# @param e [Google::Apis::Error]
Expand All @@ -19,6 +20,10 @@ def wrap(e, message = nil)

# @param e [Google::Apis::Error]
def retryable_error?(e)
retryable_server_error?(e) || retryable_region_not_writable?(e)
end

def retryable_server_error?(e)
e.is_a?(Google::Apis::ServerError) && RETRYABLE_STATUS_CODE.include?(e.status_code)
end

Expand All @@ -30,6 +35,10 @@ def retryable_insert_errors_reason?(reason)
RETRYABLE_INSERT_ERRORS_REASON.include?(reason)
end

def retryable_region_not_writable?(e)
e.is_a?(Google::Apis::ClientError) && e.status_code == 400 && e.message.include?(REGION_NOT_WRITABLE_MESSAGE)
end

# Guard for instantiation
private :new
def inherited(subclass)
Expand Down
7 changes: 4 additions & 3 deletions lib/fluent/plugin/bigquery/writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def insert_rows(project, dataset, table_id, rows, schema, template_suffix: nil)
end
end
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
log.debug "insert error: #{e.message}", status_code: e.respond_to?(:status_code) ? e.status_code : nil, reason: e.respond_to?(:reason) ? e.reason : nil
error_data = { project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message }
wrapped = Fluent::BigQuery::Error.wrap(e)
if wrapped.retryable?
Expand All @@ -112,7 +113,7 @@ def insert_rows(project, dataset, table_id, rows, schema, template_suffix: nil)
raise wrapped
end

JobReference = Struct.new(:chunk_id, :chunk_id_hex, :project_id, :dataset_id, :table_id, :job_id) do
JobReference = Struct.new(:chunk_id, :chunk_id_hex, :project_id, :dataset_id, :table_id, :job_id, :location) do
def as_hash(*keys)
if keys.empty?
to_h
Expand Down Expand Up @@ -161,7 +162,7 @@ def create_load_job(chunk_id, chunk_id_hex, project, dataset, table_id, upload_s
upload_source: upload_source,
content_type: "application/octet-stream",
)
JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, res.job_reference.job_id)
JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, res.job_reference.job_id, res.job_reference.location)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message

Expand All @@ -175,7 +176,7 @@ def create_load_job(chunk_id, chunk_id_hex, project, dataset, table_id, upload_s
def fetch_load_job(job_reference)
project = job_reference.project_id
job_id = job_reference.job_id
location = @options[:location]
location = job_reference.location

res = client.get_job(project, job_id, location: location)
log.debug "load job fetched", id: job_id, state: res.status.state, **job_reference.as_hash(:project_id, :dataset_id, :table_id)
Expand Down
4 changes: 0 additions & 4 deletions lib/fluent/plugin/out_bigquery_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ class BigQueryBaseOutput < Output
config_param :private_key_path, :string, default: nil
config_param :private_key_passphrase, :string, default: 'notasecret', secret: true
config_param :json_key, default: nil, secret: true
# The geographic location of the job. Required except for US and EU.
# https://github.com/googleapis/google-api-ruby-client/blob/master/generated/google/apis/bigquery_v2/service.rb#L350
config_param :location, :string, default: nil

# see as simple reference
# https://github.com/abronte/BigQuery/blob/master/lib/bigquery.rb
Expand Down Expand Up @@ -135,7 +132,6 @@ def writer
private_key_path: @private_key_path, private_key_passphrase: @private_key_passphrase,
email: @email,
json_key: @json_key,
location: @location,
source_format: @source_format,
skip_invalid_rows: @skip_invalid_rows,
ignore_unknown_values: @ignore_unknown_values,
Expand Down
29 changes: 22 additions & 7 deletions test/plugin/test_out_bigquery_load.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ def test_write
}
}
}, upload_source: duck_type(:write, :sync, :rewind), content_type: "application/octet-stream") do
stub!.job_reference.stub!.job_id { "dummy_job_id" }
stub!.job_reference.stub! do |s|
s.job_id { "dummy_job_id" }
s.location { "us" }
end
end
end

Expand Down Expand Up @@ -118,7 +121,10 @@ def test_write_with_prevent_duplicate_load
},
job_reference: {project_id: 'yourproject_id', job_id: satisfy { |x| x =~ /fluentd_job_.*/}} ,
}, upload_source: duck_type(:write, :sync, :rewind), content_type: "application/octet-stream") do
stub!.job_reference.stub!.job_id { "dummy_job_id" }
stub!.job_reference.stub! do |s|
s.job_id { "dummy_job_id" }
s.location { "us" }
end
end
end

Expand Down Expand Up @@ -155,10 +161,13 @@ def test_write_with_retryable_error
}
}
}, upload_source: duck_type(:write, :sync, :rewind), content_type: "application/octet-stream") do
stub!.job_reference.stub!.job_id { "dummy_job_id" }
stub!.job_reference.stub! do |s|
s.job_id { "dummy_job_id" }
s.location { "us" }
end
end

mock(writer.client).get_job('yourproject_id', 'dummy_job_id', :location=>nil) do
mock(writer.client).get_job('yourproject_id', 'dummy_job_id', location: "us") do
stub! do |s|
s.id { 'dummy_job_id' }
s.configuration.stub! do |_s|
Expand Down Expand Up @@ -238,10 +247,13 @@ def test_write_with_not_retryable_error
}
}
}, upload_source: duck_type(:write, :sync, :rewind), content_type: "application/octet-stream") do
stub!.job_reference.stub!.job_id { "dummy_job_id" }
stub!.job_reference.stub! do |s|
s.job_id { "dummy_job_id" }
s.location { "us" }
end
end

mock(writer.client).get_job('yourproject_id', 'dummy_job_id', :location=>nil) do
mock(writer.client).get_job('yourproject_id', 'dummy_job_id', location: "us") do
stub! do |s|
s.id { 'dummy_job_id' }
s.configuration.stub! do |_s|
Expand Down Expand Up @@ -318,7 +330,10 @@ def test_write_with_auto_create_table
}
}
}, upload_source: duck_type(:write, :sync, :rewind), content_type: "application/octet-stream") do
stub!.job_reference.stub!.job_id { "dummy_job_id" }
stub!.job_reference.stub! do |s|
s.job_id { "dummy_job_id" }
s.location { "us" }
end
end
end

Expand Down
Loading