Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel upload, ranged get #215

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions blob/lib/azure/storage/blob/blob.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#--------------------------------------------------------------------------
require 'thread'

module Azure::Storage
module Blob
Expand Down Expand Up @@ -82,11 +83,17 @@ def initialize
# - The lease ID specified in the request matches that of the blob.
# If this header is specified and both of these conditions are not met, the request will fail
# and the Get Blob operation will fail with status code 412 (Precondition Failed).
# * +:parallel_threshold+ - Integer. Complete the request concurrently if the specified range >= this threshold.
# Takes precedence over the storage_blob_parallel_threshold client option.
#
# See http://msdn.microsoft.com/en-us/library/azure/dd179440.aspx
#
# Returns a blob and the blob body
def get_blob(container, blob, options = {})
if options[:end_range].to_i - options[:start_range].to_i >= parallel_threshold(options)
return get_blob_parallel(container, blob, options)
end

query = {}
StorageService.with_query query, "snapshot", options[:snapshot]
StorageService.with_query query, "timeout", options[:timeout] if options[:timeout]
Expand Down Expand Up @@ -928,5 +935,50 @@ def delete_blob(container, blob, options = {})
call(:delete, uri, nil, headers, options)
nil
end

# Private: executes get_blob with threads
#
# Returns a blob and the blob body
private
def get_blob_parallel(container, blob, options)
blob_size = get_blob_properties(container, blob, options).properties[:content_length]
end_range = [options[:end_range], blob_size].min
start_range = options[:start_range]
total_bytes = end_range - start_range

# if after calculating the real end_range it is below threshold, return
if total_bytes < parallel_threshold(options)
return get_blob(container, blob, options.merge({parallel_threshold: Float::INFINITY, start_range: start_range, end_range: end_range}))
end

thread_count = client.storage_blob_parallel_threads
single_thread_chunk = (total_bytes.to_f / thread_count).ceil
ranges = (1..thread_count).to_a.map do |i|
start = options[:start_range] + ((i - 1) * single_thread_chunk)
fin = [start + single_thread_chunk - 1, options[:end_range]].min
[start, fin]
end

threads = []
ranges.each do |start, fin|
thread = Thread.new do
get_blob(container, blob, options.merge({parallel_threshold: Float::INFINITY, start_range: start, end_range: fin}))
end
thread.abort_on_exception = true
threads << thread
end

# each get_blob returns a [result, body]. Transpose into results, bodies
results, bodies = threads.map(&:value).transpose
result = results.first
result.properties[:content_length] = total_bytes if result
return result, bodies.join
end

# Private: The number of bytes used to determine if a request should be parallelized
private
def parallel_threshold(options)
options[:parallel_threshold] || client.storage_blob_parallel_threshold || Float::INFINITY
end
end
end
25 changes: 20 additions & 5 deletions blob/lib/azure/storage/blob/block.rb
Original file line number Diff line number Diff line change
Expand Up @@ -468,10 +468,25 @@ def create_block_blob_multiple_put(container, blob, content, size, options = {})
# Get the number of blocks
block_count = (Float(size) / Float(block_size)).ceil
block_list = []
for block_id in 0...block_count
id = block_id.to_s.rjust(6, "0")
put_blob_block(container, blob, id, content.read(block_size), timeout: options[:timeout], lease_id: options[:lease_id])
block_list.push([id])

# Split the block list into groups of threads and upload in parallel
max_thread_count = client.options[:storage_blob_parallel_threads] || 1
(0..block_count - 1).to_a.each_slice(max_thread_count) do |block_slice|
threads = []
thread_count = block_slice.length
block_slice_data = content.read(block_size * thread_count)

block_slice.each_with_index do |block_id, index_in_block_slice|
thread = Thread.new do
id = block_id.to_s.rjust(6, "0")
put_blob_block(container, blob, id, block_slice_data.slice(index_in_block_slice * block_size, block_size), timeout: options[:timeout], lease_id: options[:lease_id])
[id]
end
thread.abort_on_exception = true
threads << thread
end

block_list.concat threads.map(&:value).sort
end

# Commit the blocks put
Expand Down Expand Up @@ -521,7 +536,7 @@ def get_block_size(size)
if size > BlobConstants::MAX_BLOCK_BLOB_SIZE
raise ArgumentError, "Block blob size should be less than #{BlobConstants::MAX_BLOCK_BLOB_SIZE} bytes in size"
elsif (size / BlobConstants::MAX_BLOCK_COUNT) < BlobConstants::DEFAULT_WRITE_BLOCK_SIZE_IN_BYTES
BlobConstants::DEFAULT_WRITE_BLOCK_SIZE_IN_BYTES
client.options[:storage_blob_write_block_size] || BlobConstants::DEFAULT_WRITE_BLOCK_SIZE_IN_BYTES
else
BlobConstants::MAX_BLOCK_SIZE
end
Expand Down
2 changes: 1 addition & 1 deletion blob/lib/azure/storage/blob/default.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ module BlobConstants
DEFAULT_SINGLE_BLOB_PUT_THRESHOLD_IN_BYTES = 128 * 1024 * 1024

# The default write block size, in bytes, used by blob streams.
DEFAULT_WRITE_BLOCK_SIZE_IN_BYTES = 4 * 1024 * 1024
DEFAULT_WRITE_BLOCK_SIZE_IN_BYTES = 5 * 1024 * 1024

# The maximum size of a single block.
MAX_BLOCK_SIZE = 100 * 1024 * 1024
Expand Down
6 changes: 6 additions & 0 deletions common/lib/azure/storage/common/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ class Client
# * +:storage_access_key+ - Base64 String. The access key of the storage account.
# * +:storage_sas_token+ - String. The signed access signature for the storage account or one of its service.
# * +:storage_blob_host+ - String. Specified Blob serivce endpoint or hostname
# * +:storage_blob_parallel_threshold+ - Integer. Complete requests concurrently if the range greater than or equal to this value.
# * +:storage_blob_parallel_threads+ - Integer. Number of threads for parallel operations. Should be less than http_pool_size.
# * +:storage_table_host+ - String. Specified Table serivce endpoint or hostname
# * +:storage_queue_host+ - String. Specified Queue serivce endpoint or hostname
# * +:storage_dns_suffix+ - String. The suffix of a regional Storage Serivce, to
# * +:http_pool_size+ - Integer. Persistent HTTP Client pool size.
# * +:default_endpoints_protocol+ - String. http or https
# * +:use_path_style_uri+ - String. Whether use path style URI for specified endpoints
# * +:ca_file+ - String. File path of the CA file if having issue with SSL
Expand Down Expand Up @@ -97,9 +100,12 @@ class << self
# * +:storage_access_key+ - Base64 String. The access key of the storage account.
# * +:storage_sas_token+ - String. The signed access signature for the storage account or one of its service.
# * +:storage_blob_host+ - String. Specified Blob service endpoint or hostname
# * +:storage_blob_parallel_threshold+ - Integer. Complete requests concurrently if the range greater than or equal to this value.
# * +:storage_blob_parallel_threads+ - Integer. Number of threads for parallel operations. Should be less than http_pool_size.
# * +:storage_table_host+ - String. Specified Table service endpoint or hostname
# * +:storage_queue_host+ - String. Specified Queue service endpoint or hostname
# * +:storage_dns_suffix+ - String. The suffix of a regional Storage Service, to
# * +:http_pool_size+ - Integer. Persistent HTTP Client pool size.
# * +:default_endpoints_protocol+ - String. http or https
# * +:use_path_style_uri+ - String. Whether use path style URI for specified endpoints
# * +:ca_file+ - String. File path of the CA file if having issue with SSL
Expand Down
22 changes: 20 additions & 2 deletions common/lib/azure/storage/common/client_options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

module Azure::Storage::Common
module ClientOptions
attr_accessor :ca_file, :ssl_version, :ssl_min_version, :ssl_max_version
attr_accessor :ca_file, :ssl_version, :ssl_min_version, :ssl_max_version, :http_pool_size

# Public: Reset options for [Azure::Storage::Common::Client]
#
Expand All @@ -49,9 +49,13 @@ module ClientOptions
# * +:storage_access_key+ - Base64 String. The access key of the storage account.
# * +:storage_sas_token+ - String. The signed access signature for the storage account or one of its service.
# * +:storage_blob_host+ - String. Specified Blob serivce endpoint or hostname
# * +:storage_blob_write_block_size+ - Integer. Block size in bytes for blob writes. Default is 5MB.
# * +:storage_blob_parallel_threshold+ - Integer. Complete requests concurrently if the range greater than or equal to this value.
# * +:storage_blob_parallel_threads+ - Integer. Number of threads for parallel operations. Should be less than http_pool_size.
# * +:storage_table_host+ - String. Specified Table serivce endpoint or hostname
# * +:storage_queue_host+ - String. Specified Queue serivce endpoint or hostname
# * +:storage_dns_suffix+ - String. The suffix of a regional Storage Serivce, to
# * +:http_pool_size+ - Integer. Persistent HTTP Client pool size. Default is 5.
# * +:default_endpoints_protocol+ - String. http or https
# * +:use_path_style_uri+ - String. Whether use path style URI for specified endpoints
# * +:ca_file+ - String. File path of the CA file if having issue with SSL
Expand Down Expand Up @@ -91,6 +95,7 @@ def reset!(options = {})
@ssl_version = options.delete(:ssl_version)
@ssl_min_version = options.delete(:ssl_min_version)
@ssl_max_version = options.delete(:ssl_max_version)
@http_pool_size = options.delete(:http_pool_size)
@options = filter(options)
self.send(:reset_config!, @options) if self.respond_to?(:reset_config!)
self
Expand Down Expand Up @@ -120,6 +125,9 @@ def self.valid_options
:storage_connection_string,
:storage_sas_token,
:storage_blob_host,
:storage_blob_write_block_size,
:storage_blob_parallel_threshold,
:storage_blob_parallel_threads,
:storage_table_host,
:storage_queue_host,
:storage_file_host,
Expand All @@ -139,6 +147,9 @@ def self.env_vars_mapping
"AZURE_STORAGE_ACCESS_KEY" => :storage_access_key,
"AZURE_STORAGE_CONNECTION_STRING" => :storage_connection_string,
"AZURE_STORAGE_BLOB_HOST" => :storage_blob_host,
"AZURE_STORAGE_WRITE_BLOCK_SIZE" => :storage_blob_write_block_size,
"AZURE_STORAGE_BLOB_PARALLEL_THRESHOLD" => :storage_blob_parallel_threshold,
"AZURE_STORAGE_BLOB_PARALLEL_THREADS" => :storage_blob_parallel_threads,
"AZURE_STORAGE_TABLE_HOST" => :storage_table_host,
"AZURE_STORAGE_QUEUE_HOST" => :storage_queue_host,
"AZURE_STORAGE_FILE_HOST" => :storage_file_host,
Expand All @@ -154,10 +165,14 @@ def self.connection_string_mapping
@connection_string_mapping ||= {
"UseDevelopmentStorage" => :use_development_storage,
"DevelopmentStorageProxyUri" => :development_storage_proxy_uri,
"HttpPoolSize" => :http_pool_size,
"DefaultEndpointsProtocol" => :default_endpoints_protocol,
"AccountName" => :storage_account_name,
"AccountKey" => :storage_access_key,
"BlobEndpoint" => :storage_blob_host,
"BlobParallelThreshold" => :storage_blob_parallel_threshold,
"BlobParallelThreads" => :storage_blob_parallel_threads,
"BlobWriteBlockSize" => :storage_blob_write_block_size,
"TableEndpoint" => :storage_table_host,
"QueueEndpoint" => :storage_queue_host,
"FileEndpoint" => :storage_file_host,
Expand Down Expand Up @@ -300,7 +315,7 @@ def validated_options(opts, requirements = {})
required = requirements[:required] || []
at_least_one = requirements[:at_least_one] || []
only_one = requirements[:only_one] || []
optional = requirements[:optional] || []
optional = (requirements[:optional] || []).concat([:storage_blob_write_block_size, :storage_blob_parallel_threshold, :storage_blob_parallel_threads])

raise InvalidOptionsError, "Not all required keys are provided: #{required}" if required.any? { |k| !opts.key? k }
raise InvalidOptionsError, "Only one of #{only_one} is required" unless only_one.length == 0 || only_one.count { |k| opts.key? k } == 1
Expand All @@ -313,6 +328,9 @@ def validated_options(opts, requirements = {})
storage_access_key: is_base64_encoded,
storage_sas_token: lambda { |i| i.is_a?(String) },
storage_blob_host: is_url,
storage_blob_write_block_size: lambda { |i| i.is_a?(Integer) },
storage_blob_parallel_threshold: lambda { |i| i.is_a?(Integer) },
storage_blob_parallel_threads: lambda { |i| i.is_a?(Integer) },
storage_table_host: is_url,
storage_queue_host: is_url,
storage_file_host: is_url,
Expand Down
14 changes: 13 additions & 1 deletion common/lib/azure/storage/common/configurable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ module Configurable
# emulator). This should be the complete host, including http:// at the
# start. When using the emulator, make sure to include your account name at
# the end.
# @!attribute storage_blob_write_block_size
# @return [Integer] Set the block size in bytes for blob writes
# @!attribute storage_blob_parallel_threshold
# @return [Integer] Set the range threshold in bytes for blob operation parallelization.
# @!attribute storage_blob_parallel_threads
# @return [Integer] Set the number of threads for blob operation parallelization.
# @!attribute storage_table_host
# @return [String] Set the host for the Table service. Only set this if you want
# something custom (like, for example, to point this to a LocalStorage
Expand All @@ -55,7 +61,10 @@ module Configurable
attr_accessor :storage_access_key,
:storage_account_name,
:storage_connection_string,
:storage_sas_token
:storage_sas_token,
:storage_blob_write_block_size,
:storage_blob_parallel_threshold,
:storage_blob_parallel_threads

attr_writer :storage_table_host,
:storage_blob_host,
Expand All @@ -79,6 +88,9 @@ def keys
:storage_sas_token,
:storage_table_host,
:storage_blob_host,
:storage_blob_write_block_size,
:storage_blob_parallel_threshold,
:storage_blob_parallel_threads,
:storage_queue_host,
:storage_file_host,
:signer
Expand Down
2 changes: 1 addition & 1 deletion common/lib/azure/storage/common/core/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def build_http(uri)
end || nil
Faraday.new(uri, ssl: ssl_options, proxy: proxy_options) do |conn|
conn.use FaradayMiddleware::FollowRedirects
conn.adapter :net_http_persistent, pool_size: 5 do |http|
conn.adapter :net_http_persistent, pool_size: (self.http_pool_size || 5) do |http|
# yields Net::HTTP::Persistent
http.idle_timeout = 100
end
Expand Down
12 changes: 12 additions & 0 deletions common/lib/azure/storage/common/default.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ def storage_blob_host
ENV["AZURE_STORAGE_BLOB_HOST"]
end

def storage_blob_write_block_size
ENV["AZURE_STORAGE_BLOB_WRITE_BLOCK_SIZE"]&.to_i
end

def storage_blob_parallel_threads
ENV["AZURE_STORAGE_BLOB_PARALLEL_THREADS"]&.to_i
end

def storage_blob_parallel_threshold
ENV["AZURE_STORAGE_BLOB_PARALLEL_THRESHOLD"]&.to_i
end

# Default storage queue host
# @return [String]
def storage_queue_host
Expand Down
25 changes: 25 additions & 0 deletions test/integration/blob/block_blob_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@

describe Azure::Storage::Blob::BlobService do
subject { Azure::Storage::Blob::BlobService.create(SERVICE_CREATE_OPTIONS()) }
let(:parallel_subject) {
Azure::Storage::Blob::BlobService::create(
{
storage_account_name: "mockaccount",
storage_access_key: "YWNjZXNzLWtleQ==",
storage_blob_parallel_threads: 2,
storage_blob_parallel_threshold: 4,
storage_blob_write_block_size: 4
})
}

after { ContainerNameHelper.clean }

let(:container_name) { ContainerNameHelper.name }
Expand Down Expand Up @@ -87,6 +98,20 @@
_(blob.properties[:content_type]).must_equal "text/plain; charset=UTF-8"
end

it "creates a block that is larger than single upload in parallel" do
options = {}
options[:single_upload_threshold] = Azure::Storage::Blob::BlobConstants::DEFAULT_WRITE_BLOCK_SIZE_IN_BYTES
content_50_mb = SecureRandom.random_bytes(50 * 1024 * 1024)
content_50_mb.force_encoding "utf-8"
blob_name = BlobNameHelper.name
blob = parallel_subject.create_block_blob container_name, blob_name, content_50_mb, options
_(blob.name).must_equal blob_name
# No content length if single upload
_(blob.properties[:content_length]).must_equal 50 * 1024 * 1024
_(blob.properties[:content_type]).must_equal "text/plain; charset=UTF-8"
end


it "should create a block blob with spaces in name" do
blob_name = "blob with spaces"
blob = subject.create_block_blob container_name, blob_name, "content"
Expand Down
Loading