Skip to content

Commit

Permalink
apacheGH-45358: [C++][Python] Add MemoryPool method to print statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Jan 27, 2025
1 parent 4408e2b commit 8fcf877
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 0 deletions.
42 changes: 42 additions & 0 deletions cpp/src/arrow/memory_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ class DebugAllocator {
}
}

static void PrintStats() {
WrappedAllocator::PrintStats();
}

private:
static Result<int64_t> RawSize(int64_t size) {
if (ARROW_PREDICT_FALSE(internal::AddWithOverflow(size, kOverhead, &size))) {
Expand Down Expand Up @@ -378,6 +382,12 @@ class SystemAllocator {
// The return value of malloc_trim is not an error but to inform
// you if memory was actually released or not, which we do not care about here
ARROW_UNUSED(malloc_trim(0));
#endif
}

static void PrintStats() {
#ifdef __GLIBC__
malloc_stats();
#endif
}
};
Expand Down Expand Up @@ -430,6 +440,10 @@ class MimallocAllocator {
mi_free(ptr);
}
}

static void PrintStats() {
mi_stats_print_out(nullptr, nullptr);
}
};

#endif // defined(ARROW_MIMALLOC)
Expand Down Expand Up @@ -512,6 +526,8 @@ class BaseMemoryPoolImpl : public MemoryPool {

void ReleaseUnused() override { Allocator::ReleaseUnused(); }

void PrintStats() override { Allocator::PrintStats(); }

int64_t bytes_allocated() const override { return stats_.bytes_allocated(); }

int64_t max_memory() const override { return stats_.max_memory(); }
Expand Down Expand Up @@ -724,6 +740,14 @@ void LoggingMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t alignment) {
std::cout << "Free: size = " << size << ", alignment = " << alignment << std::endl;
}

void LoggingMemoryPool::ReleaseUnused() {
pool_->ReleaseUnused();
}

void LoggingMemoryPool::PrintStats() {
pool_->PrintStats();
}

int64_t LoggingMemoryPool::bytes_allocated() const {
int64_t nb_bytes = pool_->bytes_allocated();
std::cout << "bytes_allocated: " << nb_bytes << std::endl;
Expand Down Expand Up @@ -775,6 +799,16 @@ class ProxyMemoryPool::ProxyMemoryPoolImpl {
stats_.DidFreeBytes(size);
}

void ReleaseUnused() {
pool_->ReleaseUnused();
}

void PrintStats() {
// XXX these are the allocation stats for the underlying allocator, not
// the subset allocated through the ProxyMemoryPool
pool_->PrintStats();
}

int64_t bytes_allocated() const { return stats_.bytes_allocated(); }

int64_t max_memory() const { return stats_.max_memory(); }
Expand Down Expand Up @@ -809,6 +843,14 @@ void ProxyMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t alignment) {
return impl_->Free(buffer, size, alignment);
}

void ProxyMemoryPool::ReleaseUnused() {
impl_->ReleaseUnused();
}

void ProxyMemoryPool::PrintStats() {
impl_->PrintStats();
}

int64_t ProxyMemoryPool::bytes_allocated() const { return impl_->bytes_allocated(); }

int64_t ProxyMemoryPool::max_memory() const { return impl_->max_memory(); }
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/memory_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ class ARROW_EXPORT MemoryPool {
/// unable to fulfill the request due to fragmentation.
virtual void ReleaseUnused() {}

/// Print statistics
///
/// Print allocation statistics on stderr. The output format is
/// implementation-specific. Not all memory pools implement this method.
virtual void PrintStats() {}

/// The number of bytes that were allocated and not yet free'd through
/// this allocator.
virtual int64_t bytes_allocated() const = 0;
Expand Down Expand Up @@ -187,6 +193,8 @@ class ARROW_EXPORT LoggingMemoryPool : public MemoryPool {
Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) override;
void Free(uint8_t* buffer, int64_t size, int64_t alignment) override;
void ReleaseUnused() override;
void PrintStats() override;

int64_t bytes_allocated() const override;

Expand Down Expand Up @@ -219,6 +227,8 @@ class ARROW_EXPORT ProxyMemoryPool : public MemoryPool {
Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) override;
void Free(uint8_t* buffer, int64_t size, int64_t alignment) override;
void ReleaseUnused() override;
void PrintStats() override;

int64_t bytes_allocated() const override;

Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/memory_pool_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class JemallocAllocator {
uint8_t** ptr);
static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t alignment);
static void ReleaseUnused();
static void PrintStats();
};

#endif // defined(ARROW_JEMALLOC)
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/memory_pool_jemalloc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ void JemallocAllocator::ReleaseUnused() {
mallctl("arena." ARROW_STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", NULL, NULL, NULL, 0);
}

void JemallocAllocator::PrintStats() {
malloc_stats_print(nullptr, nullptr, /*opts=*/"");
}

} // namespace internal

} // namespace memory_pool
Expand Down
3 changes: 3 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,11 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
cdef cppclass CMemoryPool" arrow::MemoryPool":
int64_t bytes_allocated()
int64_t max_memory()
int64_t total_bytes_allocated()
int64_t num_allocations()
c_string backend_name()
void ReleaseUnused()
void PrintStats()

cdef cppclass CLoggingMemoryPool" arrow::LoggingMemoryPool"(CMemoryPool):
CLoggingMemoryPool(CMemoryPool*)
Expand Down
25 changes: 25 additions & 0 deletions python/pyarrow/memory.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ cdef class MemoryPool(_Weakrefable):
"""
return self.pool.bytes_allocated()

def total_bytes_allocated(self):
"""
Return the total number of bytes that have been allocated from this
memory pool.
"""
return self.pool.total_bytes_allocated()

def max_memory(self):
"""
Return the peak memory allocation in this memory pool.
Expand All @@ -69,6 +76,23 @@ cdef class MemoryPool(_Weakrefable):
ret = self.pool.max_memory()
return ret if ret >= 0 else None

def num_allocations(self):
"""
Return the number of allocations or reallocations that were made
using this memory pool.
"""
return self.pool.num_allocations()

def print_stats(self):
"""
Print statistics about this memory pool.
The output format is implementation-specific. Not all memory pools
implement this method.
"""
with nogil:
self.pool.PrintStats()

@property
def backend_name(self):
"""
Expand All @@ -83,6 +107,7 @@ cdef class MemoryPool(_Weakrefable):
f"bytes_allocated={self.bytes_allocated()} "
f"max_memory={self.max_memory()}>")


cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool):
if memory_pool is None:
return c_get_memory_pool()
Expand Down
22 changes: 22 additions & 0 deletions python/pyarrow/tests/test_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,17 @@ def check_allocated_bytes(pool):
"""
allocated_before = pool.bytes_allocated()
max_mem_before = pool.max_memory()
num_allocations_before = pool.num_allocations()
with allocate_bytes(pool, 512):
assert pool.bytes_allocated() == allocated_before + 512
new_max_memory = pool.max_memory()
assert pool.max_memory() >= max_mem_before
num_allocations_after = pool.num_allocations()
assert num_allocations_after > num_allocations_before
assert num_allocations_after < num_allocations_before + 5
assert pool.bytes_allocated() == allocated_before
assert pool.max_memory() == new_max_memory
assert pool.num_allocations() == num_allocations_after


def test_default_allocated_bytes():
Expand Down Expand Up @@ -271,3 +276,20 @@ def test_debug_memory_pool_unknown(pool_factory):
"Valid values are 'abort', 'trap', 'warn', 'none'."
)
check_debug_memory_pool_disabled(pool_factory, env_value, msg)


@pytest.mark.parametrize('pool_factory', supported_factories())
def test_print_stats(pool_factory):
code = f"""if 1:
import pyarrow as pa
pool = pa.{pool_factory.__name__}()
buf = pa.allocate_buffer(64, memory_pool=pool)
pool.print_stats()
"""
res = subprocess.run([sys.executable, "-c", code], check=True,
universal_newlines=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if sys.platform == "linux":
# On Linux at least, all memory pools should emit statistics
assert res.stderr.strip() != ""

0 comments on commit 8fcf877

Please sign in to comment.