Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ListOffsets KIP : 396 #4225

Merged
merged 22 commits into from
Oct 17, 2023
Merged

ListOffsets KIP : 396 #4225

merged 22 commits into from
Oct 17, 2023

Conversation

mahajanadhitya
Copy link
Contributor

… queue implemented

@mahajanadhitya mahajanadhitya force-pushed the feature/listOffsets-AdminClient branch from cbd499c to 9f89db5 Compare May 26, 2023 11:42
@mahajanadhitya mahajanadhitya force-pushed the feature/listOffsets-AdminClient branch from 6b35ec6 to 3689716 Compare July 18, 2023 09:58
@cla-assistant
Copy link

cla-assistant bot commented Aug 21, 2023

CLA assistant check
All committers have signed the CLA.

Copy link
Collaborator

@emasab emasab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First review comments about general checks and rdkafka.h additions.

examples/list_offsets.c Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
examples/list_offsets.c Outdated Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka.h Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
@mahajanadhitya mahajanadhitya force-pushed the feature/listOffsets-AdminClient branch 3 times, most recently from 09524c4 to 6fcce61 Compare September 28, 2023 06:16
examples/list_offsets Outdated Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka.h Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka.h Outdated
@@ -9732,6 +9767,26 @@ rd_kafka_error_t *rd_kafka_commit_transaction(rd_kafka_t *rk, int timeout_ms);
RD_EXPORT
rd_kafka_error_t *rd_kafka_abort_transaction(rd_kafka_t *rk, int timeout_ms);

typedef enum rd_kafka_OffsetSpec_s {
Copy link
Collaborator

@emasab emasab Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing to move the section.
You can also put it after DeleteConsumerGroupOffsets, before Admin API - User SCRAM credentials

src/rdkafka_admin.h Outdated Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka.h Show resolved Hide resolved
Copy link
Collaborator

@emasab emasab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some things to change to keep backward compatibility for consumer rd_kafka_query_watermark_offsets and rd_kafka_offsets_for_times functions and to upgrade internal consumer calls too.

src/rdkafka_admin.c Outdated Show resolved Hide resolved
src/rdkafka_admin.h Outdated Show resolved Hide resolved
src/rdkafka_admin.h Outdated Show resolved Hide resolved
src/rdkafka_request.c Outdated Show resolved Hide resolved
src/rdkafka_request.c Outdated Show resolved Hide resolved
src/rdkafka_request.c Outdated Show resolved Hide resolved
src/rdkafka_request.c Outdated Show resolved Hide resolved
src/rdkafka_request.c Outdated Show resolved Hide resolved
src/rdkafka_request.c Show resolved Hide resolved
src/rdkafka_request.c Outdated Show resolved Hide resolved
@emasab
Copy link
Collaborator

emasab commented Oct 12, 2023

rd_kafka_offsets_for_times needs to be fixed as well but will be in a different PR

if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
/* Remove its cache in case the topic isn't a known topic. */
rd_kafka_wrlock(rk);
rd_kafka_metadata_cache_delete_by_name(rk, state->topic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can check here also that if cache is hinted for this topic, and in that case, do nothing as a refresh is already ongoing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for ListOffsets

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already ensured by rd_kafka_metadata_cache_delete_by_name because it does a rd_kafka_metadata_cache_find with valid equal to 1.

Copy link
Contributor

@milindl milindl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It mostly looks great, thanks for the PR @mahajanadhitya and @emasab !
Some minor comments about the conventions and declarations of the variables
Two comments for the tests.
Good fix for the not leader issue, let's slowly (in other PRs) see if we can extend it to other admin functions which have similar retriable errors.

CHANGELOG.md Outdated Show resolved Hide resolved
rd_usleep(100000, 0);
} else if (err) {
TEST_FAIL("Failed with error: %s",
rd_kafka_err2name(err));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: break the outer loop here, it's cleaner that way without the else

goto err;
}

rd_list_t *topic_partitions_sorted = rd_list_new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move declaration up


rd_list_t *topic_partitions_sorted = rd_list_new(
topic_partitions->cnt, rd_kafka_topic_partition_destroy_free);
for (i = 0; i < topic_partitions->cnt; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for (i = 0; i < topic_partitions->cnt; i++) {
for (i = 0; i < topic_partitions->cnt; i++)

rd_list_add(
topic_partitions_sorted,
rd_kafka_topic_partition_copy(&topic_partitions->elems[i]));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}

err = rd_kafka_event_error(event);
if (err == RD_KAFKA_RESP_ERR__NOENT) {
/* Still looking for the leader */
rd_usleep(100000, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: continue the loop here, avoids if elses

src/rdkafka.h Outdated
@@ -6829,7 +6850,9 @@ typedef enum rd_kafka_admin_op_t {
RD_KAFKA_ADMIN_OP_ALTERUSERSCRAMCREDENTIALS,
RD_KAFKA_ADMIN_OP_DESCRIBETOPICS, /**< DescribeTopics */
RD_KAFKA_ADMIN_OP_DESCRIBECLUSTER, /**< DescribeCluster */
RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */
/** ListOffsets */
RD_KAFKA_ADMIN_OP_LISTOFFSETS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
RD_KAFKA_ADMIN_OP_LISTOFFSETS,
RD_KAFKA_ADMIN_OP_LISTOFFSETS, /**< ListOffsets */

src/rdkafka.h Outdated
@@ -6829,7 +6850,9 @@ typedef enum rd_kafka_admin_op_t {
RD_KAFKA_ADMIN_OP_ALTERUSERSCRAMCREDENTIALS,
RD_KAFKA_ADMIN_OP_DESCRIBETOPICS, /**< DescribeTopics */
RD_KAFKA_ADMIN_OP_DESCRIBECLUSTER, /**< DescribeCluster */
RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */
/** ListOffsets */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** ListOffsets */


rd_kafka_event_destroy(rkev);

rd_kafka_mock_broker_push_request_error_rtts(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the watermarks test - replace push of errors with rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); instead and let the mock handler introduce the error


TEST_SAY("Testing offset %" PRId64 "\n", test_fixture.query);

rd_kafka_topic_partition_list_t *topic_partitions_copy =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

declaration might need to be moved up

Copy link
Collaborator

@emasab emasab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I approve. Given there are my changes too, let's wait for a second approval by @milindl

Copy link
Contributor

@milindl milindl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for these changes!

rd_kafka_query_watermark_offsets metadata refresh bug
@emasab emasab merged commit 99c67d3 into master Oct 17, 2023
1 check was pending
@emasab emasab deleted the feature/listOffsets-AdminClient branch October 17, 2023 08:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants