Package.Fetch: fix Git package fetching

This commit works around #18967 by adding an `AccumulatingReader`, which
accumulates data read from the underlying packfile, and by keeping track
of the position in the packfile and hash/checksum information separately
rather than using reader composition. That is, the packfile position and
hashes/checksums are updated with the accumulated read history data only
after we can determine what data has actually been used by the
decompressor rather than merely being buffered.

The only addition to the standard library APIs to support this change is
the `unreadBytes` function in `std.compress.flate.Inflate`, which allows
the user to determine how many bytes have been read only for buffering
and not used as part of compressed data.

These changes can be reverted if #18967 is resolved with a decompressor
that reads precisely only the number of bytes needed for decompression.
This commit is contained in:
Ian Johnson 2024-02-18 16:39:03 -05:00 committed by Andrew Kelley
parent 5c25ad0fda
commit 80f3ef6e14
2 changed files with 163 additions and 24 deletions

View File

@ -288,6 +288,14 @@ pub fn Inflate(comptime container: Container, comptime ReaderType: type) type {
}
}
/// Returns the number of bytes that have been read from the internal
/// reader but not yet consumed by the decompressor.
pub fn unreadBytes(self: Self) usize {
// There can be no error here: the denominator is not zero, and
// overflow is not possible since the type is unsigned.
return std.math.divCeil(usize, self.bits.nbits, 8) catch unreachable;
}
// Iterator interface
/// Can be used in iterator like loop without memcpy to another buffer:

View File

@ -1091,6 +1091,86 @@ pub fn indexPack(allocator: Allocator, pack: std.fs.File, index_writer: anytype)
try index_writer.writeAll(&index_checksum);
}
/// A reader that stores read data in a growable internal buffer. The read
/// position can be rewound to allow previously read data to be read again.
fn AccumulatingReader(comptime ReaderType: type) type {
return struct {
child_reader: ReaderType,
buffer: std.ArrayListUnmanaged(u8) = .{},
/// The position in `buffer` from which reads should start, returning
/// buffered data. If this is `buffer.items.len`, data will be read from
/// `child_reader` instead.
read_start: usize = 0,
allocator: Allocator,
const Self = @This();
fn deinit(self: *Self) void {
self.buffer.deinit(self.allocator);
self.* = undefined;
}
const ReadError = ReaderType.Error || Allocator.Error;
const Reader = std.io.Reader(*Self, ReadError, read);
fn read(self: *Self, buf: []u8) ReadError!usize {
if (self.read_start < self.buffer.items.len) {
// Previously buffered data is available and should be used
// before reading more data from the underlying reader.
const available = self.buffer.items.len - self.read_start;
const count = @min(available, buf.len);
@memcpy(buf[0..count], self.buffer.items[self.read_start..][0..count]);
self.read_start += count;
return count;
}
try self.buffer.ensureUnusedCapacity(self.allocator, buf.len);
const read_buffer = self.buffer.unusedCapacitySlice();
const count = try self.child_reader.read(read_buffer[0..buf.len]);
@memcpy(buf[0..count], read_buffer[0..count]);
self.buffer.items.len += count;
self.read_start += count;
return count;
}
fn reader(self: *Self) Reader {
return .{ .context = self };
}
/// Returns a slice of the buffered data that has already been read,
/// except the last `count_before_end` bytes.
fn readDataExcept(self: Self, count_before_end: usize) []const u8 {
assert(count_before_end <= self.read_start);
return self.buffer.items[0 .. self.read_start - count_before_end];
}
/// Discards the first `count` bytes of buffered data.
fn discard(self: *Self, count: usize) void {
assert(count <= self.buffer.items.len);
const retain = self.buffer.items.len - count;
mem.copyForwards(
u8,
self.buffer.items[0..retain],
self.buffer.items[count..][0..retain],
);
self.buffer.items.len = retain;
self.read_start -= @min(self.read_start, count);
}
/// Rewinds the read position to the beginning of buffered data.
fn rewind(self: *Self) void {
self.read_start = 0;
}
};
}
fn accumulatingReader(
allocator: Allocator,
reader: anytype,
) AccumulatingReader(@TypeOf(reader)) {
return .{ .child_reader = reader, .allocator = allocator };
}
/// Performs the first pass over the packfile data for index construction.
/// This will index all non-delta objects, queue delta objects for further
/// processing, and return the pack checksum (which is part of the index
@ -1102,59 +1182,106 @@ fn indexPackFirstPass(
pending_deltas: *std.ArrayListUnmanaged(IndexEntry),
) ![Sha1.digest_length]u8 {
var pack_buffered_reader = std.io.bufferedReader(pack.reader());
var pack_counting_reader = std.io.countingReader(pack_buffered_reader.reader());
var pack_hashed_reader = std.compress.hashedReader(pack_counting_reader.reader(), Sha1.init(.{}));
const pack_reader = pack_hashed_reader.reader();
var pack_accumulating_reader = accumulatingReader(allocator, pack_buffered_reader.reader());
defer pack_accumulating_reader.deinit();
var pack_position: usize = 0;
var pack_hash = Sha1.init(.{});
const pack_reader = pack_accumulating_reader.reader();
const pack_header = try PackHeader.read(pack_reader);
const pack_header_bytes = pack_accumulating_reader.readDataExcept(0);
pack_position += pack_header_bytes.len;
pack_hash.update(pack_header_bytes);
pack_accumulating_reader.discard(pack_header_bytes.len);
var current_entry: u32 = 0;
while (current_entry < pack_header.total_objects) : (current_entry += 1) {
const entry_offset = pack_counting_reader.bytes_read;
var entry_crc32_reader = std.compress.hashedReader(pack_reader, std.hash.Crc32.init());
const entry_header = try EntryHeader.read(entry_crc32_reader.reader());
const entry_offset = pack_position;
var entry_crc32 = std.hash.Crc32.init();
const entry_header = try EntryHeader.read(pack_reader);
const entry_header_bytes = pack_accumulating_reader.readDataExcept(0);
pack_position += entry_header_bytes.len;
pack_hash.update(entry_header_bytes);
entry_crc32.update(entry_header_bytes);
pack_accumulating_reader.discard(entry_header_bytes.len);
switch (entry_header) {
inline .commit, .tree, .blob, .tag => |object, tag| {
var entry_decompress_stream = std.compress.zlib.decompressor(entry_crc32_reader.reader());
var entry_counting_reader = std.io.countingReader(entry_decompress_stream.reader());
.commit, .tree, .blob, .tag => |object| {
var entry_decompress_stream = std.compress.zlib.decompressor(pack_reader);
var entry_data_size: usize = 0;
var entry_hashed_writer = hashedWriter(std.io.null_writer, Sha1.init(.{}));
const entry_writer = entry_hashed_writer.writer();
// The object header is not included in the pack data but is
// part of the object's ID
try entry_writer.print("{s} {}\x00", .{ @tagName(tag), object.uncompressed_length });
var fifo = std.fifo.LinearFifo(u8, .{ .Static = 4096 }).init();
try fifo.pump(entry_counting_reader.reader(), entry_writer);
if (entry_counting_reader.bytes_read != object.uncompressed_length) {
try entry_writer.print("{s} {}\x00", .{ @tagName(entry_header), object.uncompressed_length });
while (try entry_decompress_stream.next()) |decompressed_data| {
entry_data_size += decompressed_data.len;
try entry_writer.writeAll(decompressed_data);
const compressed_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
pack_position += compressed_bytes.len;
pack_hash.update(compressed_bytes);
entry_crc32.update(compressed_bytes);
pack_accumulating_reader.discard(compressed_bytes.len);
}
const footer_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
pack_position += footer_bytes.len;
pack_hash.update(footer_bytes);
entry_crc32.update(footer_bytes);
pack_accumulating_reader.discard(footer_bytes.len);
pack_accumulating_reader.rewind();
if (entry_data_size != object.uncompressed_length) {
return error.InvalidObject;
}
const oid = entry_hashed_writer.hasher.finalResult();
try index_entries.put(allocator, oid, .{
.offset = entry_offset,
.crc32 = entry_crc32_reader.hasher.final(),
.crc32 = entry_crc32.final(),
});
},
inline .ofs_delta, .ref_delta => |delta| {
var entry_decompress_stream = std.compress.zlib.decompressor(entry_crc32_reader.reader());
var entry_counting_reader = std.io.countingReader(entry_decompress_stream.reader());
var fifo = std.fifo.LinearFifo(u8, .{ .Static = 4096 }).init();
try fifo.pump(entry_counting_reader.reader(), std.io.null_writer);
if (entry_counting_reader.bytes_read != delta.uncompressed_length) {
var entry_decompress_stream = std.compress.zlib.decompressor(pack_reader);
var entry_data_size: usize = 0;
while (try entry_decompress_stream.next()) |decompressed_data| {
entry_data_size += decompressed_data.len;
const compressed_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
pack_position += compressed_bytes.len;
pack_hash.update(compressed_bytes);
entry_crc32.update(compressed_bytes);
pack_accumulating_reader.discard(compressed_bytes.len);
}
const footer_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
pack_position += footer_bytes.len;
pack_hash.update(footer_bytes);
entry_crc32.update(footer_bytes);
pack_accumulating_reader.discard(footer_bytes.len);
pack_accumulating_reader.rewind();
if (entry_data_size != delta.uncompressed_length) {
return error.InvalidObject;
}
try pending_deltas.append(allocator, .{
.offset = entry_offset,
.crc32 = entry_crc32_reader.hasher.final(),
.crc32 = entry_crc32.final(),
});
},
}
}
const pack_checksum = pack_hashed_reader.hasher.finalResult();
const recorded_checksum = try pack_buffered_reader.reader().readBytesNoEof(Sha1.digest_length);
const pack_checksum = pack_hash.finalResult();
const recorded_checksum = try pack_reader.readBytesNoEof(Sha1.digest_length);
if (!mem.eql(u8, &pack_checksum, &recorded_checksum)) {
return error.CorruptedPack;
}
_ = pack_buffered_reader.reader().readByte() catch |e| switch (e) {
_ = pack_reader.readByte() catch |e| switch (e) {
error.EndOfStream => return pack_checksum,
else => |other| return other,
};
@ -1385,7 +1512,11 @@ test "packfile indexing and checkout" {
defer worktree.cleanup();
const commit_id = try parseOid("dd582c0720819ab7130b103635bd7271b9fd4feb");
try repository.checkout(worktree.dir, commit_id);
var diagnostics: Diagnostics = .{ .allocator = testing.allocator };
defer diagnostics.deinit();
try repository.checkout(worktree.dir, commit_id, &diagnostics);
try testing.expect(diagnostics.errors.items.len == 0);
const expected_files: []const []const u8 = &.{
"dir/file",