Spaces:
Running
Running
# Licensed to the Apache Software Foundation (ASF) under one | |
# or more contributor license agreements. See the NOTICE file | |
# distributed with this work for additional information | |
# regarding copyright ownership. The ASF licenses this file | |
# to you under the Apache License, Version 2.0 (the | |
# "License"); you may not use this file except in compliance | |
# with the License. You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, | |
# software distributed under the License is distributed on an | |
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
# KIND, either express or implied. See the License for the | |
# specific language governing permissions and limitations | |
# under the License. | |
import time | |
import pyarrow as pa | |
class HighLatencyReader(object): | |
def __init__(self, raw, latency): | |
self.raw = raw | |
self.latency = latency | |
def close(self): | |
self.raw.close() | |
def closed(self): | |
return self.raw.closed | |
def read(self, nbytes=None): | |
time.sleep(self.latency) | |
return self.raw.read(nbytes) | |
class HighLatencyWriter(object): | |
def __init__(self, raw, latency): | |
self.raw = raw | |
self.latency = latency | |
def close(self): | |
self.raw.close() | |
def closed(self): | |
return self.raw.closed | |
def write(self, data): | |
time.sleep(self.latency) | |
self.raw.write(data) | |
class BufferedIOHighLatency(object): | |
"""Benchmark creating a parquet manifest.""" | |
increment = 1024 | |
total_size = 16 * (1 << 20) # 16 MB | |
buffer_size = 1 << 20 # 1 MB | |
latency = 0.1 # 100ms | |
param_names = ('latency',) | |
params = [0, 0.01, 0.1] | |
def time_buffered_writes(self, latency): | |
test_data = b'x' * self.increment | |
bytes_written = 0 | |
out = pa.BufferOutputStream() | |
slow_out = HighLatencyWriter(out, latency) | |
buffered_out = pa.output_stream(slow_out, buffer_size=self.buffer_size) | |
while bytes_written < self.total_size: | |
buffered_out.write(test_data) | |
bytes_written += self.increment | |
buffered_out.flush() | |
def time_buffered_reads(self, latency): | |
bytes_read = 0 | |
reader = pa.input_stream(pa.py_buffer(b'x' * self.total_size)) | |
slow_reader = HighLatencyReader(reader, latency) | |
buffered_reader = pa.input_stream(slow_reader, | |
buffer_size=self.buffer_size) | |
while bytes_read < self.total_size: | |
buffered_reader.read(self.increment) | |
bytes_read += self.increment | |