-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer_groups.py
More file actions
124 lines (106 loc) · 4.83 KB
/
consumer_groups.py
File metadata and controls
124 lines (106 loc) · 4.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""Consumer group management endpoints"""
from fastapi import APIRouter, HTTPException, status, Depends
from auth import get_current_user, User
from services.kafka_admin import get_admin_client
from confluent_kafka import Consumer, TopicPartition
from services.kafka_config import get_consumer_config
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/consumer-groups", tags=["Consumer Groups"])
@router.get("")
async def list_consumer_groups(current_user: User = Depends(get_current_user)):
"""List all consumer groups in the Kafka cluster"""
try:
admin_client = get_admin_client()
groups_result = admin_client.list_groups(timeout=10)
consumer_groups = []
for group in groups_result:
if group.error is None:
consumer_groups.append({
"group_id": group.id,
"protocol_type": group.protocol_type,
"state": group.state if hasattr(group, 'state') else "Unknown"
})
return {
"count": len(consumer_groups),
"consumer_groups": consumer_groups
}
except Exception as e:
logger.error(f"Failed to list consumer groups: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to list consumer groups: {str(e)}"
)
@router.get("/{group_id}")
async def describe_consumer_group(group_id: str, current_user: User = Depends(get_current_user)):
"""Get detailed information about a specific consumer group"""
try:
admin_client = get_admin_client()
groups_result = admin_client.list_groups(timeout=10)
group_exists = False
for group in groups_result:
if group.id == group_id and group.error is None:
group_exists = True
break
if not group_exists:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Consumer group '{group_id}' not found"
)
try:
result = admin_client.list_consumer_group_offsets([group_id])
group_info = {
"group_id": group_id,
"state": "Active",
"protocol_type": "consumer",
"members": [],
"partition_assignments": []
}
for group_partition, future in result.items():
try:
topic_partitions = future.result(timeout=10)
for tp in topic_partitions:
if tp.offset >= 0:
temp_consumer = Consumer(get_consumer_config('temp-admin-consumer'))
try:
low, high = temp_consumer.get_watermark_offsets(
TopicPartition(tp.topic, tp.partition),
timeout=5
)
lag = high - tp.offset if tp.offset >= 0 else 0
except:
high = -1
lag = -1
finally:
temp_consumer.close()
group_info["partition_assignments"].append({
"topic": tp.topic,
"partition": tp.partition,
"current_offset": tp.offset,
"log_end_offset": high,
"lag": lag
})
except Exception as e:
logger.warning(f"Failed to get offsets for group '{group_id}': {str(e)}")
group_info["member_count"] = len(set(
pa["partition"] for pa in group_info["partition_assignments"]
))
return group_info
except Exception as e:
logger.warning(f"Failed to get detailed info for group '{group_id}': {str(e)}")
return {
"group_id": group_id,
"state": "Unknown",
"protocol_type": "consumer",
"members": [],
"partition_assignments": [],
"note": "Limited information available. Group may be empty or inactive."
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to describe consumer group '{group_id}': {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to describe consumer group: {str(e)}"
)