Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ set(ICEBERG_SOURCES
update/snapshot_update.cc
update/update_location.cc
update/update_partition_spec.cc
update/update_partition_statistics.cc
update/update_properties.cc
update/update_schema.cc
update/update_snapshot_reference.cc
Expand Down
33 changes: 33 additions & 0 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ constexpr std::string_view kActionRemoveProperties = "remove-properties";
constexpr std::string_view kActionSetLocation = "set-location";
constexpr std::string_view kActionSetStatistics = "set-statistics";
constexpr std::string_view kActionRemoveStatistics = "remove-statistics";
constexpr std::string_view kActionSetPartitionStatistics = "set-partition-statistics";
constexpr std::string_view kActionRemovePartitionStatistics =
"remove-partition-statistics";

// TableUpdate field constants
constexpr std::string_view kUUID = "uuid";
Expand Down Expand Up @@ -1439,6 +1442,24 @@ nlohmann::json ToJson(const TableUpdate& update) {
json[kSnapshotId] = u.snapshot_id();
break;
}
case TableUpdate::Kind::kSetPartitionStatistics: {
const auto& u =
internal::checked_cast<const table::SetPartitionStatistics&>(update);
json[kAction] = kActionSetPartitionStatistics;
if (u.partition_statistics_file()) {
json[kPartitionStatistics] = ToJson(*u.partition_statistics_file());
} else {
json[kPartitionStatistics] = nlohmann::json::value_t::null;
}
break;
}
case TableUpdate::Kind::kRemovePartitionStatistics: {
const auto& u =
internal::checked_cast<const table::RemovePartitionStatistics&>(update);
json[kAction] = kActionRemovePartitionStatistics;
json[kSnapshotId] = u.snapshot_id();
break;
}
}
return json;
}
Expand Down Expand Up @@ -1628,6 +1649,18 @@ Result<std::unique_ptr<TableUpdate>> TableUpdateFromJson(const nlohmann::json& j
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
return std::make_unique<table::RemoveStatistics>(snapshot_id);
}
if (action == kActionSetPartitionStatistics) {
ICEBERG_ASSIGN_OR_RAISE(auto partition_statistics_json,
GetJsonValue<nlohmann::json>(json, kPartitionStatistics));
ICEBERG_ASSIGN_OR_RAISE(auto partition_statistics_file,
PartitionStatisticsFileFromJson(partition_statistics_json));
return std::make_unique<table::SetPartitionStatistics>(
std::move(partition_statistics_file));
}
if (action == kActionRemovePartitionStatistics) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
return std::make_unique<table::RemovePartitionStatistics>(snapshot_id);
}

return JsonParseError("Unknown table update action: {}", action);
}
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ iceberg_sources = files(
'update/snapshot_update.cc',
'update/update_location.cc',
'update/update_partition_spec.cc',
'update/update_partition_statistics.cc',
'update/update_properties.cc',
'update/update_schema.cc',
'update/update_snapshot_reference.cc',
Expand Down
8 changes: 8 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "iceberg/transaction.h"
#include "iceberg/update/expire_snapshots.h"
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_partition_statistics.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/update/update_schema.h"
#include "iceberg/update/update_statistics.h"
Expand Down Expand Up @@ -214,6 +215,13 @@ Result<std::shared_ptr<UpdateStatistics>> Table::NewUpdateStatistics() {
return transaction->NewUpdateStatistics();
}

Result<std::shared_ptr<UpdatePartitionStatistics>> Table::NewUpdatePartitionStatistics() {
ICEBERG_ASSIGN_OR_RAISE(
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
/*auto_commit=*/true));
return transaction->NewUpdatePartitionStatistics();
}

Result<std::shared_ptr<StagedTable>> StagedTable::Make(
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down
5 changes: 5 additions & 0 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
/// changes.
virtual Result<std::shared_ptr<UpdateStatistics>> NewUpdateStatistics();

/// \brief Create a new UpdatePartitionStatistics to update partition statistics and
/// commit the changes.
virtual Result<std::shared_ptr<UpdatePartitionStatistics>>
NewUpdatePartitionStatistics();

/// \brief Create a new UpdateLocation to update the table location and commit the
/// changes.
virtual Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
Expand Down
45 changes: 43 additions & 2 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,9 @@ class TableMetadataBuilder::Impl {
Status RemovePartitionSpecs(const std::vector<int32_t>& spec_ids);
Status SetStatistics(std::shared_ptr<StatisticsFile> statistics_file);
Status RemoveStatistics(int64_t snapshot_id);
Status SetPartitionStatistics(
std::shared_ptr<PartitionStatisticsFile> partition_statistics_file);
Status RemovePartitionStatistics(int64_t snapshot_id);

Result<std::unique_ptr<TableMetadata>> Build();

Expand Down Expand Up @@ -1208,6 +1211,41 @@ Status TableMetadataBuilder::Impl::RemoveStatistics(int64_t snapshot_id) {
return {};
}

Status TableMetadataBuilder::Impl::SetPartitionStatistics(
std::shared_ptr<PartitionStatisticsFile> partition_statistics_file) {
ICEBERG_PRECHECK(partition_statistics_file != nullptr,
"Cannot set null partition statistics file");

// Find and replace existing partition statistics for the same snapshot_id, or add new
// one
auto it = std::ranges::find_if(
metadata_.partition_statistics,
[snapshot_id = partition_statistics_file->snapshot_id](const auto& stat) {
return stat && stat->snapshot_id == snapshot_id;
});

if (it != metadata_.partition_statistics.end()) {
*it = partition_statistics_file;
} else {
metadata_.partition_statistics.push_back(partition_statistics_file);
}

changes_.push_back(std::make_unique<table::SetPartitionStatistics>(
std::move(partition_statistics_file)));
return {};
}

Status TableMetadataBuilder::Impl::RemovePartitionStatistics(int64_t snapshot_id) {
auto removed_count =
std::erase_if(metadata_.partition_statistics, [snapshot_id](const auto& stat) {
return stat && stat->snapshot_id == snapshot_id;
});
if (removed_count != 0) {
changes_.push_back(std::make_unique<table::RemovePartitionStatistics>(snapshot_id));
}
return {};
}

std::unordered_set<int64_t> TableMetadataBuilder::Impl::IntermediateSnapshotIdSet(
int64_t current_snapshot_id) const {
std::unordered_set<int64_t> added_snapshot_ids;
Expand Down Expand Up @@ -1636,12 +1674,15 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveStatistics(int64_t snapshot_id

TableMetadataBuilder& TableMetadataBuilder::SetPartitionStatistics(
const std::shared_ptr<PartitionStatisticsFile>& partition_statistics_file) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
ICEBERG_BUILDER_RETURN_IF_ERROR(
impl_->SetPartitionStatistics(partition_statistics_file));
return *this;
}

TableMetadataBuilder& TableMetadataBuilder::RemovePartitionStatistics(
int64_t snapshot_id) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemovePartitionStatistics(snapshot_id));
return *this;
}

TableMetadataBuilder& TableMetadataBuilder::SetProperties(
Expand Down
56 changes: 56 additions & 0 deletions src/iceberg/table_update.cc
Original file line number Diff line number Diff line change
Expand Up @@ -500,4 +500,60 @@ std::unique_ptr<TableUpdate> RemoveStatistics::Clone() const {
return std::make_unique<RemoveStatistics>(snapshot_id_);
}

// SetPartitionStatistics

int64_t SetPartitionStatistics::snapshot_id() const {
return partition_statistics_file_->snapshot_id;
}

void SetPartitionStatistics::ApplyTo(TableMetadataBuilder& builder) const {
builder.SetPartitionStatistics(partition_statistics_file_);
}

void SetPartitionStatistics::GenerateRequirements(TableUpdateContext& context) const {
// SetPartitionStatistics doesn't generate any requirements
}

bool SetPartitionStatistics::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kSetPartitionStatistics) {
return false;
}
const auto& other_set = internal::checked_cast<const SetPartitionStatistics&>(other);
if (!partition_statistics_file_ != !other_set.partition_statistics_file_) {
return false;
}
if (partition_statistics_file_ &&
!(*partition_statistics_file_ == *other_set.partition_statistics_file_)) {
return false;
}
return true;
}

std::unique_ptr<TableUpdate> SetPartitionStatistics::Clone() const {
return std::make_unique<SetPartitionStatistics>(partition_statistics_file_);
}

// RemovePartitionStatistics

void RemovePartitionStatistics::ApplyTo(TableMetadataBuilder& builder) const {
builder.RemovePartitionStatistics(snapshot_id_);
}

void RemovePartitionStatistics::GenerateRequirements(TableUpdateContext& context) const {
// RemovePartitionStatistics doesn't generate any requirements
}

bool RemovePartitionStatistics::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kRemovePartitionStatistics) {
return false;
}
const auto& other_remove =
internal::checked_cast<const RemovePartitionStatistics&>(other);
return snapshot_id_ == other_remove.snapshot_id_;
}

std::unique_ptr<TableUpdate> RemovePartitionStatistics::Clone() const {
return std::make_unique<RemovePartitionStatistics>(snapshot_id_);
}

} // namespace iceberg::table
50 changes: 50 additions & 0 deletions src/iceberg/table_update.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class ICEBERG_EXPORT TableUpdate {
kSetLocation,
kSetStatistics,
kRemoveStatistics,
kSetPartitionStatistics,
kRemovePartitionStatistics,
};

virtual ~TableUpdate();
Expand Down Expand Up @@ -558,6 +560,54 @@ class ICEBERG_EXPORT RemoveStatistics : public TableUpdate {
int64_t snapshot_id_;
};

/// \brief Represents setting partition statistics for a snapshot
class ICEBERG_EXPORT SetPartitionStatistics : public TableUpdate {
public:
explicit SetPartitionStatistics(
std::shared_ptr<PartitionStatisticsFile> partition_statistics_file)
: partition_statistics_file_(std::move(partition_statistics_file)) {}

int64_t snapshot_id() const;

const std::shared_ptr<PartitionStatisticsFile>& partition_statistics_file() const {
return partition_statistics_file_;
}

void ApplyTo(TableMetadataBuilder& builder) const override;

void GenerateRequirements(TableUpdateContext& context) const override;

Kind kind() const override { return Kind::kSetPartitionStatistics; }

bool Equals(const TableUpdate& other) const override;

std::unique_ptr<TableUpdate> Clone() const override;

private:
std::shared_ptr<PartitionStatisticsFile> partition_statistics_file_;
};

/// \brief Represents removing partition statistics for a snapshot
class ICEBERG_EXPORT RemovePartitionStatistics : public TableUpdate {
public:
explicit RemovePartitionStatistics(int64_t snapshot_id) : snapshot_id_(snapshot_id) {}

int64_t snapshot_id() const { return snapshot_id_; }

void ApplyTo(TableMetadataBuilder& builder) const override;

void GenerateRequirements(TableUpdateContext& context) const override;

Kind kind() const override { return Kind::kRemovePartitionStatistics; }

bool Equals(const TableUpdate& other) const override;

std::unique_ptr<TableUpdate> Clone() const override;

private:
int64_t snapshot_id_;
};

} // namespace table

} // namespace iceberg
3 changes: 2 additions & 1 deletion src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ if(ICEBERG_BUILD_BUNDLE)
update_properties_test.cc
update_schema_test.cc
update_sort_order_test.cc
update_statistics_test.cc)
update_statistics_test.cc
update_partition_statistics_test.cc)

add_iceberg_test(data_writer_test USE_BUNDLE SOURCES data_writer_test.cc)

Expand Down
37 changes: 37 additions & 0 deletions src/iceberg/test/json_internal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,43 @@ TEST(JsonInternalTest, TableUpdateRemoveStatistics) {
update);
}

TEST(JsonInternalTest, TableUpdateSetPartitionStatistics) {
auto partition_stats_file = std::make_shared<PartitionStatisticsFile>();
partition_stats_file->snapshot_id = 123456789;
partition_stats_file->path =
"s3://bucket/warehouse/table/metadata/partition-stats-123456789.parquet";
partition_stats_file->file_size_in_bytes = 2048;

table::SetPartitionStatistics update(partition_stats_file);
nlohmann::json expected = R"({
"action": "set-partition-statistics",
"partition-statistics": {
"snapshot-id": 123456789,
"statistics-path": "s3://bucket/warehouse/table/metadata/partition-stats-123456789.parquet",
"file-size-in-bytes": 2048
}
})"_json;

EXPECT_EQ(ToJson(update), expected);
auto parsed = TableUpdateFromJson(expected);
ASSERT_THAT(parsed, IsOk());
EXPECT_EQ(*internal::checked_cast<table::SetPartitionStatistics*>(parsed.value().get()),
update);
}

TEST(JsonInternalTest, TableUpdateRemovePartitionStatistics) {
table::RemovePartitionStatistics update(123456789);
nlohmann::json expected =
R"({"action":"remove-partition-statistics","snapshot-id":123456789})"_json;

EXPECT_EQ(ToJson(update), expected);
auto parsed = TableUpdateFromJson(expected);
ASSERT_THAT(parsed, IsOk());
EXPECT_EQ(
*internal::checked_cast<table::RemovePartitionStatistics*>(parsed.value().get()),
update);
}

TEST(JsonInternalTest, TableUpdateUnknownAction) {
nlohmann::json json = R"({"action":"unknown-action"})"_json;
auto result = TableUpdateFromJson(json);
Expand Down
Loading
Loading