-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmcp_web_client.py
More file actions
1401 lines (1176 loc) · 70.6 KB
/
mcp_web_client.py
File metadata and controls
1401 lines (1176 loc) · 70.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# web_client.py
import asyncio
import json
import os
import sys
import re
import uuid
import logging
import shutil
import argparse
from quart import Quart, request, jsonify, render_template, Response
from quart_cors import cors
import hypercorn.asyncio
from hypercorn.config import Config
from enum import Enum, auto
from datetime import datetime
# This environment variable MUST be set to "false" before any LangChain
# modules are imported to programmatically disable the problematic tracer.
os.environ["LANGCHAIN_TRACING_V2"] = "false"
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_mcp_adapters.resources import load_mcp_resources
# Using the base google.generativeai library for stateful chat
import google.generativeai as genai
# --- Globals for Web App ---
app = Quart(__name__)
app = cors(app, allow_origin="*") # Enable CORS for all origins
# --- App Configuration ---
class AppConfig:
ALL_MODELS_UNLOCKED = False
CHARTING_ENABLED = False
TERADATA_MCP_CONNECTED = False
CHART_MCP_CONNECTED = False
APP_CONFIG = AppConfig()
CERTIFIED_MODEL = "gemini-1.5-flash-8b-latest"
# --- System Prompt Templates ---
PROVIDER_SYSTEM_PROMPTS = {
"Google": (
"You are a specialized assistant for interacting with a Teradata database. Your primary goal is to fulfill user requests by selecting the best tool, prompt, or sequence of tools.\n\n"
"--- **Core Reasoning Hierarchy** ---\n"
"1. **Check for a Perfect Prompt:** First, analyze the user's request and see if there is a single, pre-defined **prompt** that exactly matches the user's intent and scope.\n"
"2. **Synthesize a Plan from Tools:** If no single prompt is a perfect match, you must become a **planner**. Create a logical sequence of steps to solve the user's request.\n"
"3. **Execute the First Step:** Your response will be the JSON for the *first tool* in your plan.\n\n"
"--- **CRITICAL RULE: CONTEXT and PARAMETER INFERENCE** ---\n"
"You **MUST** remember and reuse information from previous turns.\n"
"**Example of CORRECT Inference:**\n"
" - USER (Turn 1): \"what is the business description for the `equipment` table in database `DEMO_Customer360_db`?\"\n"
" - ASSISTANT (Turn 1): (Executes the request)\n"
" - USER (Turn 2): \"ok now what is the quality of that table?\"\n"
" - YOUR CORRECT REASONING (Turn 2): \"The user is asking about 'that table'. The previous turn mentioned the `equipment` table in the `DEMO_Customer360_db` database. I will reuse these parameters.\"\n"
" - YOUR CORRECT ACTION (Turn 2): `json ...` for `qlty_columnSummary` with `db_name`: `DEMO_Customer360_db` and `table_name`: `equipment`.\n\n"
"--- **CRITICAL RULE: TOOL ARGUMENT ADHERENCE** ---\n"
"You **MUST** use the exact parameter names provided in the tool definitions. Do not invent or guess parameter names.\n\n"
"--- **CRITICAL RULE: SQL GENERATION** ---\n"
"When using the `base_readQuery` tool, if you know the database name, you **MUST** use fully qualified table names in your SQL query (e.g., `SELECT ... FROM my_database.my_table`).\n\n"
"--- **CRITICAL RULE: HANDLING TIME-SENSITIVE QUERIES** ---\n"
"If the user asks a question involving a relative date (e.g., 'today', 'yesterday', 'this week'), you do not know this information. Your first step **MUST** be to find the current date before proceeding.\n\n"
"**Example of CORRECT Multi-Step Plan:**\n"
" - USER: \"what is the system utilization in number of queries for today?\"\n"
" - YOUR CORRECT REASONING (Step 1): \"The user is asking about 'today'. I do not know the current date. My first step must be to get the current date from the database.\"\n"
" - YOUR CORRECT ACTION (Step 1):\n"
" ```json\n"
" {{\n"
" \"tool_name\": \"base_readQuery\",\n"
" \"arguments\": {{ \"sql\": \"SELECT CURRENT_DATE\" }}\n"
" }}\n"
" ```\n"
" - TOOL RESPONSE (Step 1): `{{\"results\": [{{\"Date\": \"2025-07-29\"}}]}}`\n"
" - YOUR CORRECT REASONING (Step 2): \"The database returned the current date as 2025-07-29. Now I can use this date to answer the user's original question about system utilization.\"\n"
" - YOUR CORRECT ACTION (Step 2):\n"
" ```json\n"
" {{\n"
" \"tool_name\": \"dba_resusageSummary\",\n"
" \"arguments\": {{ \"date\": \"2025-07-29\" }}\n"
" }}\n"
" ```\n\n"
"--- **CRITICAL RULE: TOOL FAILURE AND RECOVERY** ---\n"
"If a tool call fails with an error message, you **MUST** attempt to recover. Your recovery process is as follows:\n"
"1. **Analyze the Error:** Read the error message carefully. If it indicates an invalid column, parameter, or dimension (e.g., 'Column not found'), identify the specific argument that caused the failure.\n"
"2. **Consult Tool Docs:** Review the documentation for the failed tool that is provided in this system prompt.\n"
"3. **Formulate a New Plan:** Your next thought process should explain the error and propose a corrected tool call. Typically, this means re-issuing the tool call *without* the single failing parameter.\n"
"4. **Retry the Tool:** Execute the corrected tool call.\n"
"5. **Ask for Help:** Only if the corrected tool call also fails should you give up and ask the user for clarification.\n\n"
"{charting_instructions}\n\n"
"--- **Response Formatting** ---\n"
"- **To execute a tool:** Respond with 'Thought:' explaining your choice, followed by a ```json ... ``` block with the `tool_name` and `arguments`.\n"
"- **To execute a prompt:** Respond with 'Thought:' explaining your choice, followed by a ```json ... ``` block with the `prompt_name` and `arguments`.\n"
"- **Clarifying Question:** Only ask if information is truly missing.\n\n"
"{tools_context}\n\n"
"{prompts_context}\n\n"
"{charts_context}\n\n"
),
"Anthropic": "Placeholder prompt for Anthropic models.",
"OpenAI": "Placeholder prompt for OpenAI models."
}
CHARTING_INSTRUCTIONS = {
"none": "--- **Charting Rules** ---\n- Charting is disabled. Do NOT use any charting tools.",
"medium": (
"--- **Charting Rules** ---\n"
"- After successfully gathering data with Teradata tools, consider if a visualization would enhance the answer.\n"
"- Use a chart tool if it provides a clear summary (e.g., bar chart for space usage, pie chart for distributions).\n"
"- Do not generate charts for simple data retrievals that are easily readable in a table.\n"
"- When you use a chart tool, tell the user in your final answer what the chart represents."
),
"heavy": (
"--- **Charting Rules** ---\n"
"- You should actively look for opportunities to visualize data.\n"
"- After nearly every successful data-gathering operation, your next step should be to call an appropriate chart tool to visualize the results.\n"
"- Prefer visual answers over text-based tables whenever possible.\n"
"- When you use a chart tool, tell the user in your final answer what the chart represents."
)
}
# --- Globals ---
tools_context = "--- No Tools Available ---"
prompts_context = "--- No Prompts Available ---"
charts_context = "--- No Charts Available ---"
llm = None
mcp_client = None
# This will store the configurations for all connected MCP servers
SERVER_CONFIGS = {}
mcp_tools = {}
mcp_prompts = {}
mcp_charts = {}
structured_tools = {}
structured_prompts = {}
structured_resources = {}
structured_charts = {}
tool_scopes = {}
SESSIONS = {}
# --- Helper for Server-Sent Events ---
def _format_sse(data: dict, event: str = None) -> str:
"""Formats a dictionary into a server-sent event string."""
msg = f"data: {json.dumps(data)}\n"
if event is not None:
msg += f"event: {event}\n"
return f"{msg}\n"
# --- Core Logic ---
class OutputFormatter:
"""
Parses raw LLM output and structured tool data to generate professional,
failure-safe HTML for the UI.
"""
def __init__(self, llm_summary_text: str, collected_data: list):
self.raw_summary = llm_summary_text
self.collected_data = collected_data
self.processed_data_indices = set()
def _sanitize_summary(self) -> str:
# This regex now also removes markdown tables
markdown_table_pattern = re.compile(r"\|.*\|[\n\r]*\|[-| :]*\|[\n\r]*(?:\|.*\|[\n\r]*)*", re.MULTILINE)
clean_summary = re.sub(markdown_table_pattern, "\n(Data table is shown below)\n", self.raw_summary)
sql_ddl_pattern = re.compile(r"```sql\s*CREATE MULTISET TABLE.*?;?\s*```|CREATE MULTISET TABLE.*?;", re.DOTALL | re.IGNORECASE)
clean_summary = re.sub(sql_ddl_pattern, "\n(Formatted DDL shown below)\n", self.raw_summary)
lines = clean_summary.strip().split('\n')
html_output = ""
in_list = False
def process_line_markdown(line):
line = re.sub(r'\*{2,3}(.*?):\*{1,3}', r'<strong>\1:</strong>', line)
line = re.sub(r'\*\*(.*?)\*\*', r'<strong>\1</strong>', line)
line = re.sub(r'`(.*?)`', r'<code class="bg-gray-900/70 text-teradata-orange rounded-md px-1.5 py-0.5 font-mono text-sm">\1</code>', line)
return line
for line in lines:
line = line.strip()
if not line:
if in_list:
html_output += '</ul>'
in_list = False
continue
if line.startswith(('* ', '- ')):
if not in_list:
html_output += '<ul class="list-disc list-inside space-y-2 text-gray-300 mb-4">'
in_list = True
content = line[2:]
processed_content = process_line_markdown(content)
html_output += f'<li>{processed_content}</li>'
elif line.startswith('# '):
if in_list: html_output += '</ul>'; in_list = False
content = line[2:]
html_output += f'<h3 class="text-xl font-bold text-white mb-3 border-b border-gray-700 pb-2">{content}</h3>'
elif line.startswith('## '):
if in_list: html_output += '</ul>'; in_list = False
content = line[3:]
html_output += f'<h4 class="text-lg font-semibold text-white mt-4 mb-2">{content}</h4>'
else:
if in_list:
html_output += '</ul>'
in_list = False
processed_line = process_line_markdown(line)
html_output += f'<p class="text-gray-300 mb-4">{processed_line}</p>'
if in_list:
html_output += '</ul>'
return html_output
def _render_ddl(self, tool_result: dict, index: int) -> str:
if not isinstance(tool_result, dict) or "results" not in tool_result: return ""
results = tool_result.get("results")
if not isinstance(results, list) or not results: return ""
ddl_text = results[0].get('Request Text', 'DDL not available.')
ddl_text_sanitized = ddl_text.replace("&", "&").replace("<", "<").replace(">", ">")
metadata = tool_result.get("metadata", {})
table_name = metadata.get("table", "DDL")
self.processed_data_indices.add(index)
return f"""
<div class="response-card">
<div class="sql-code-block">
<div class="sql-header">
<span>SQL DDL: {table_name}</span>
<button class="copy-button" onclick="copyToClipboard(this)">
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" fill="currentColor" viewBox="0 0 16 16"><path d="M4 1.5H3a2 2 0 0 0-2 2V14a2 2 0 0 0 2 2h10a2 2 0 0 0 2-2V3.5a2 2 0 0 0-2-2h-1v1h1a1 1 0 0 1 1 1V14a1 1 0 0 1-1 1H3a1 1 0 0 1-1-1V3.5a1 1 0 0 1 1-1h1v-1z"/><path d="M9.5 1a.5.5 0 0 1 .5.5v1a.5.5 0 0 1-.5.5h-3a.5.5 0 0 1-.5-.5v-1a.5.5 0 0 1 .5-.5h3zM-1 7a.5.5 0 0 1 .5-.5h15a.5.5 0 0 1 0 1H-.5A.5.5 0 0 1-1 7z"/></svg>
Copy
</button>
</div>
<pre><code class="language-sql">{ddl_text_sanitized}</code></pre>
</div>
</div>
"""
def _render_table(self, tool_result: dict, index: int, default_title: str) -> str:
if not isinstance(tool_result, dict) or "results" not in tool_result: return ""
results = tool_result.get("results")
if not isinstance(results, list) or not results or not all(isinstance(item, dict) for item in results): return ""
metadata = tool_result.get("metadata", {})
title = metadata.get("table_name", default_title)
headers = results[0].keys()
html = f"""
<div class="response-card">
<h4 class="text-lg font-semibold text-white mb-2">Data: <code>{title}</code></h4>
<div class='table-container'>
<table class='assistant-table'>
<thead><tr>{''.join(f'<th>{h}</th>' for h in headers)}</tr></thead>
<tbody>
"""
for row in results:
html += "<tr>"
for header in headers:
cell_data = str(row.get(header, ''))
sanitized_cell = cell_data.replace("&", "&").replace("<", "<").replace(">", ">")
html += f"<td>{sanitized_cell}</td>"
html += "</tr>"
html += "</tbody></table></div></div>"
self.processed_data_indices.add(index)
return html
def _render_chart(self, chart_data: dict, index: int) -> str:
chart_id = f"chart-render-target-{uuid.uuid4()}"
# The data-spec attribute will hold the JSON string for the chart
chart_spec_json = json.dumps(chart_data.get("spec", {}))
self.processed_data_indices.add(index)
return f"""
<div class="response-card">
<div id="{chart_id}" class="chart-render-target" data-spec='{chart_spec_json}'></div>
</div>
"""
def render(self) -> str:
final_html = ""
clean_summary_html = self._sanitize_summary()
if clean_summary_html:
final_html += f'<div class="response-card summary-card">{clean_summary_html}</div>'
for i, tool_result in enumerate(self.collected_data):
if i in self.processed_data_indices or not isinstance(tool_result, dict):
continue
# Check for chart type first
if tool_result.get("type") == "chart":
final_html += self._render_chart(tool_result, i)
continue
metadata = tool_result.get("metadata", {})
tool_name = metadata.get("tool_name")
if tool_name == 'base_tableDDL':
final_html += self._render_ddl(tool_result, i)
elif tool_name and "results" in tool_result:
final_html += self._render_table(tool_result, i, f"Result for {tool_name}")
if not final_html.strip():
return "<p>The agent completed its work but did not produce a visible output.</p>"
return final_html
async def call_llm_api(prompt: str, session_id: str = None, chat_history=None) -> str:
if not llm: raise RuntimeError("LLM is not initialized.")
# Use a passed chat_history if provided (for stateless calls), otherwise use the session's chat
chat_session = llm.start_chat(history=chat_history) if chat_history is not None else SESSIONS[session_id]['chat']
llm_logger = logging.getLogger("llm_conversation")
try:
full_log_message = ""
# Log the history from the actual session being used
history_for_log = chat_session.history
if history_for_log:
formatted_lines = [f"[{msg.role}]: {msg.parts[0].text}" for msg in history_for_log]
formatted_history = "\n".join(formatted_lines)
full_log_message += f"--- FULL CONTEXT (Session: {session_id or 'one-off'}) ---\n--- History ---\n{formatted_history}\n\n"
full_log_message += f"--- Current User Prompt ---\n{prompt}\n"
llm_logger.info(full_log_message)
response = await chat_session.send_message_async(prompt)
if not response or not hasattr(response, 'text'):
raise RuntimeError("LLM returned an empty or invalid response.")
response_text = response.text.strip()
llm_logger.info(f"--- RESPONSE ---\n{response_text}\n" + "-"*50 + "\n")
return response_text
except Exception as e:
app.logger.error(f"Error calling LLM API: {e}", exc_info=True)
llm_logger.error(f"--- ERROR in LLM call ---\n{e}\n" + "-"*50 + "\n")
return None
async def validate_and_correct_parameters(command: dict) -> dict:
"""
Validates LLM-generated parameters against the tool spec and attempts correction.
Returns a corrected command or a command indicating failure.
"""
tool_name = command.get("tool_name")
if not tool_name or tool_name not in mcp_tools:
return command
args = command.get("arguments", {})
# --- START: Programmatic Shim for Legacy Quality Tools ---
LEGACY_QUALITY_TOOLS = [
"qlty_missingValues", "qlty_negativeValues", "qlty_distinctCategories",
"qlty_standardDeviation", "qlty_columnSummary", "qlty_univariateStatistics",
"qlty_rowsWithMissingValues"
]
if tool_name in LEGACY_QUALITY_TOOLS:
db_name = args.get("db_name")
table_name = args.get("table_name")
if db_name and table_name and '.' not in table_name:
args["table_name"] = f"{db_name}.{table_name}"
del args["db_name"]
app.logger.info(f"Applied shim for '{tool_name}': Combined db_name and table_name.")
# --- END: Programmatic Shim ---
llm_arg_names = set(args.keys())
tool_spec = mcp_tools[tool_name]
spec_arg_names = set(tool_spec.args.keys())
required_params = {name for name, field in tool_spec.args.items() if field.get("required", False)}
# --- START: Refined Validation Logic ---
# A call is valid if all required parameters are present.
# It's okay if optional parameters are missing.
if required_params.issubset(llm_arg_names):
return command
# --- END: Refined Validation Logic ---
app.logger.info(f"Parameter mismatch for tool '{tool_name}'. Attempting correction with LLM.")
correction_prompt = f"""
You are a parameter-mapping specialist. Your task is to map the 'LLM-Generated Parameters' to the 'Official Tool Parameters'.
The user wants to call the tool '{tool_name}', which is described as: '{tool_spec.description}'.
Official Tool Parameters: {list(spec_arg_names)}
LLM-Generated Parameters: {list(llm_arg_names)}
Respond with a single JSON object that maps each generated parameter name to its correct official name.
If a generated parameter does not sensibly map to any official parameter, use `null` as the value.
Example response: {{"database": "db_name", "table": "table_name", "extra_param": null}}
"""
correction_response_text = await call_llm_api(prompt=correction_prompt, chat_history=[])
try:
json_match = re.search(r"```json\s*\n(.*?)\n\s*```", correction_response_text, re.DOTALL)
if not json_match:
json_match = re.search(r'\{.*\}', correction_response_text, re.DOTALL)
if not json_match:
raise ValueError("LLM did not return a valid JSON object for parameter mapping.")
name_mapping = json.loads(json_match.group(0).strip())
if any(v is None for v in name_mapping.values()):
raise ValueError("LLM could not confidently map all parameters.")
corrected_args = {}
for llm_name, spec_name in name_mapping.items():
if llm_name in args and spec_name in spec_arg_names:
corrected_args[spec_name] = args[llm_name]
if not required_params.issubset(set(corrected_args.keys())):
raise ValueError(f"Corrected parameters are still missing required arguments. Missing: {required_params - set(corrected_args.keys())}")
app.logger.info(f"Successfully corrected parameters for tool '{tool_name}'. New args: {corrected_args}")
command['arguments'] = corrected_args
return command
except (ValueError, json.JSONDecodeError, AttributeError) as e:
app.logger.warning(f"Parameter correction failed for '{tool_name}': {e}. Requesting user input.")
spec_arguments = list(tool_spec.args.values())
return {
"error": "parameter_mismatch",
"tool_name": tool_name,
"message": "The agent could not determine the correct parameters for the tool. Please provide them below.",
"specification": {
"name": tool_name,
"description": tool_spec.description,
"arguments": spec_arguments
}
}
async def invoke_mcp_tool(command: dict) -> any:
if command.get("tool_name") not in mcp_charts:
validated_command = await validate_and_correct_parameters(command)
if "error" in validated_command:
return validated_command
else:
validated_command = command
global mcp_client
if not mcp_client:
return {"error": "MCP client is not connected."}
tool_name = validated_command.get("tool_name")
args = validated_command.get("arguments", validated_command.get("parameters", {}))
if tool_name in mcp_charts:
app.logger.info(f"Locally handling chart generation for tool: {tool_name}")
try:
is_bar_chart = "generate_bar_chart" in tool_name
data = args.get("data", [])
# --- START: MODIFIED CHART LOGIC ---
# Prioritize explicit axis arguments from the LLM
x_field = args.get("x_axis") or args.get("x_field")
y_field = args.get("y_axis") or args.get("y_field")
angle_field = args.get("angle_field")
color_field = args.get("color_field")
# Fallback to inference only if explicit fields are missing
if not x_field or not y_field:
app.logger.info("Axis fields not specified by LLM, inferring from data types.")
if data:
first_row = data[0]
# Find first string-like column for x-axis
x_field = next((k for k, v in first_row.items() if isinstance(v, str)), None)
# Find first numeric-like column for y-axis (handles strings of numbers)
y_field = next((k for k, v in first_row.items() if isinstance(v, (int, float)) or (isinstance(v, str) and v.replace('.', '', 1).isdigit())), None)
# Ensure the data for the y-axis is numeric
if y_field and data:
for row in data:
try:
row[y_field] = float(row[y_field])
except (ValueError, TypeError):
app.logger.warning(f"Could not convert value '{row.get(y_field)}' to float for y-axis '{y_field}'.")
row[y_field] = 0 # Default to 0 if conversion fails
spec_options = {
"data": data,
"xField": y_field if is_bar_chart else x_field, # Bar charts in G2Plot swap x/y
"yField": x_field if is_bar_chart else y_field,
"angleField": angle_field,
"colorField": color_field,
"seriesField": args.get("series_field", args.get("series")),
"title": { "visible": True, "text": args.get("title", "Generated Chart") }
}
# --- END: MODIFIED CHART LOGIC ---
chart_type_mapping = {
"generate_bar_chart": "Bar", "generate_column_chart": "Column",
"generate_pie_chart": "Pie", "generate_line_chart": "Line",
"generate_area_chart": "Area", "generate_scatter_chart": "Scatter",
"generate_histogram_chart": "Histogram", "generate_boxplot_chart": "Box",
"generate_dual_axes_chart": "DualAxes",
}
plot_type = next((v for k, v in chart_type_mapping.items() if k in tool_name), "Column")
final_spec_options = {k: v for k, v in spec_options.items() if v is not None}
chart_spec = { "type": plot_type, "options": final_spec_options }
return {"type": "chart", "spec": chart_spec, "metadata": {"tool_name": tool_name}}
except Exception as e:
app.logger.error(f"Error during local chart generation: {e}", exc_info=True)
return {"error": f"Failed to generate chart spec locally: {e}"}
server_name = "teradata_mcp_server"
if tool_name not in mcp_tools:
return {"error": f"Tool '{tool_name}' not found in any connected server."}
# General parameter name normalization
if 'database_name' in args: args['db_name'] = args.pop('database_name')
if 'database' in args: args['db_name'] = args.pop('database')
if 'table' in args: args['table_name'] = args.pop('table')
if 'column_name' in args and 'col_name' not in args:
args['col_name'] = args.pop('column_name')
try:
app.logger.debug(f"Creating temporary session on '{server_name}' to invoke tool '{tool_name}' with args: {args}")
async with mcp_client.session(server_name) as temp_session:
call_tool_result = await temp_session.call_tool(tool_name, args)
app.logger.debug(f"Successfully invoked tool '{tool_name}'. Raw response: {call_tool_result}")
if hasattr(call_tool_result, 'content') and isinstance(call_tool_result.content, list) and len(call_tool_result.content) > 0:
text_content = call_tool_result.content[0]
if hasattr(text_content, 'text') and isinstance(text_content.text, str):
try:
parsed_json = json.loads(text_content.text)
return parsed_json
except json.JSONDecodeError:
app.logger.error(f"Tool '{tool_name}' returned a string that is not valid JSON: {text_content.text}")
return {"error": "Tool returned non-JSON string", "data": text_content.text}
raise RuntimeError(f"Unexpected tool result format for '{tool_name}': {call_tool_result}")
except Exception as e:
app.logger.error(f"Error during tool invocation for '{tool_name}': {e}", exc_info=True)
return {"error": f"An exception occurred while invoking tool '{tool_name}'."}
def _evaluate_inline_math(json_str: str) -> str:
"""Finds and evaluates simple inline math expressions in a JSON-like string."""
# This regex finds simple numeric expressions like '279+20' or '113 + 32'
math_expr_pattern = re.compile(r'\b(\d+\.?\d*)\s*([+\-*/])\s*(\d+\.?\d*)\b')
# Use a loop to handle multiple expressions until none are left
while True:
match = math_expr_pattern.search(json_str)
if not match:
break
num1_str, op, num2_str = match.groups()
original_expr = match.group(0)
try:
num1 = float(num1_str)
num2 = float(num2_str)
result = 0
if op == '+': result = num1 + num2
elif op == '-': result = num1 - num2
elif op == '*': result = num1 * num2
elif op == '/': result = num1 / num2
# Replace only the first occurrence to avoid infinite loops on tricky strings
json_str = json_str.replace(original_expr, str(result), 1)
except (ValueError, ZeroDivisionError):
# If something goes wrong, just leave the original expression and break
break
return json_str
class AgentState(Enum):
DECIDING = auto()
EXECUTING_TOOL = auto()
SUMMARIZING = auto()
DONE = auto()
ERROR = auto()
class PlanExecutor:
def __init__(self, session_id: str, initial_instruction: str, original_user_input: str):
self.session_id = session_id
self.original_user_input = original_user_input
self.state = AgentState.DECIDING
self.next_action_str = initial_instruction
self.collected_data = []
self.max_steps = 40
self.active_prompt_plan = None
self.active_prompt_name = None
self.current_command = None
self.iteration_context = None
async def run(self):
for i in range(self.max_steps):
if self.state in [AgentState.DONE, AgentState.ERROR]: break
try:
if self.state == AgentState.DECIDING:
yield _format_sse({"step": "Assistant has decided on an action", "details": self.next_action_str}, "llm_thought")
async for event in self._handle_deciding(): yield event
elif self.state == AgentState.EXECUTING_TOOL:
tool_name = self.current_command.get("tool_name")
if tool_scopes.get(tool_name) == 'column':
async for event in self._execute_column_iteration(): yield event
else:
async for event in self._execute_standard_tool(): yield event
elif self.state == AgentState.SUMMARIZING:
async for event in self._handle_summarizing(): yield event
except Exception as e:
app.logger.error(f"Error in state {self.state.name}: {e}", exc_info=True)
self.state = AgentState.ERROR
yield _format_sse({"error": "An error occurred during execution.", "details": str(e)}, "error")
if self.state not in [AgentState.DONE, AgentState.ERROR]:
app.logger.warning("Plan execution finished due to max steps.")
async for event in self._handle_summarizing(): yield event
async def _handle_deciding(self):
if re.search(r'FINAL_ANSWER:', self.next_action_str, re.IGNORECASE):
self.state = AgentState.SUMMARIZING
return
json_match = re.search(r"```json\s*\n(.*?)\n\s*```", self.next_action_str, re.DOTALL)
if not json_match:
if self.iteration_context:
ctx = self.iteration_context
current_item_name = ctx["items"][ctx["item_index"]]
ctx["results_per_item"][current_item_name].append(self.next_action_str)
ctx["action_count_for_item"] += 1
await self._get_next_action_from_llm()
return
app.logger.warning(f"LLM response not a tool command or FINAL_ANSWER. Summarizing. Response: {self.next_action_str}")
self.state = AgentState.SUMMARIZING
return
command_str = json_match.group(1).strip()
# --- START: MODIFIED LOGIC ---
# First, parse the JSON to identify the tool being called
try:
temp_command = json.loads(command_str)
tool_name = temp_command.get("tool_name")
# Only apply the math evaluation if it's a chart tool
if tool_name in mcp_charts:
corrected_command_str = _evaluate_inline_math(command_str)
command = json.loads(corrected_command_str)
if command_str != corrected_command_str:
app.logger.info(f"Corrected inline math in chart JSON. Corrected string: {corrected_command_str}")
else:
command = temp_command # Use the already parsed command
except json.JSONDecodeError as e:
app.logger.error(f"JSON parsing failed. Error: {e}. Original string was: {command_str}")
# Re-raise or handle the error appropriately
raise e
# --- END: MODIFIED LOGIC ---
self.current_command = command
if "prompt_name" in command:
prompt_name = command.get("prompt_name")
self.active_prompt_name = prompt_name
arguments = command.get("arguments", command.get("parameters", {}))
# Normalize db_name to database_name for prompts
if 'db_name' in arguments and 'database_name' not in arguments:
arguments['database_name'] = arguments.pop('db_name')
if not mcp_client:
raise RuntimeError("MCP client is not connected.")
try:
get_prompt_result = None
async with mcp_client.session("teradata_mcp_server") as temp_session:
# Call get_prompt with only the name, not the arguments
get_prompt_result = await temp_session.get_prompt(name=prompt_name)
if get_prompt_result is None:
raise ValueError("Prompt retrieval from MCP server returned None.")
# Now, manually render the prompt with the arguments on the client side
prompt_text = get_prompt_result.content.text if hasattr(get_prompt_result, 'content') and hasattr(get_prompt_result.content, 'text') else str(get_prompt_result)
self.active_prompt_plan = prompt_text.format(**arguments)
yield _format_sse({"step": f"Executing Prompt: {prompt_name}", "details": self.active_prompt_plan, "prompt_name": prompt_name}, "prompt_selected")
await self._get_next_action_from_llm()
except Exception as e:
app.logger.error(f"Failed to get or process prompt '{prompt_name}': {e}", exc_info=True)
raise RuntimeError(f"Could not retrieve the plan for prompt '{prompt_name}'.") from e
elif "tool_name" in command:
self.state = AgentState.EXECUTING_TOOL
else:
self.state = AgentState.SUMMARIZING
async def _execute_standard_tool(self):
yield _format_sse({"step": "Tool Execution Intent", "details": self.current_command}, "tool_result")
tool_result = await invoke_mcp_tool(self.current_command)
# --- START: MODIFIED LOGIC ---
tool_result_str = ""
if isinstance(tool_result, dict) and "error" in tool_result:
# If the tool returns an error, format it for the LLM's context
error_details = tool_result.get("data", tool_result.get("error"))
tool_result_str = json.dumps({
"tool_name": self.current_command.get("tool_name"),
"tool_output": {
"status": "error",
"error_message": error_details
}
})
# Don't add raw error objects to collected_data, let the summary handle it
else:
# On success, proceed as before
tool_result_str = json.dumps({"tool_name": self.current_command.get("tool_name"), "tool_output": tool_result})
if isinstance(tool_result, dict) and tool_result.get("type") == "chart":
if self.collected_data:
app.logger.info("Chart generated. Removing previous data source from collected data to avoid duplicate display.")
self.collected_data.pop()
self.collected_data.append(tool_result)
# --- END: MODIFIED LOGIC ---
if isinstance(tool_result, dict) and tool_result.get("error") == "parameter_mismatch":
yield _format_sse({"details": tool_result}, "request_user_input")
self.state = AgentState.ERROR
return
yield _format_sse({"step": "Tool Execution Result", "details": tool_result, "tool_name": self.current_command.get("tool_name")}, "tool_result")
if self.active_prompt_plan and not self.iteration_context:
plan_text = self.active_prompt_plan.lower()
is_iterative_plan = any(keyword in plan_text for keyword in ["cycle through", "for each", "iterate"])
if is_iterative_plan and self.current_command.get("tool_name") == "base_tableList" and isinstance(tool_result, dict) and tool_result.get("status") == "success":
items_to_iterate = [res.get("TableName") for res in tool_result.get("results", []) if res.get("TableName")]
if items_to_iterate:
self.iteration_context = {
"items": items_to_iterate, "item_index": 0, "action_count_for_item": 0,
"results_per_item": {item: [] for item in items_to_iterate}
}
yield _format_sse({"step": "Starting Multi-Step Iteration", "details": f"Plan requires processing {len(items_to_iterate)} items."})
yield _format_sse({"step": "Thinking about the next action...", "details": "The agent is reasoning based on the current context."})
await self._get_next_action_from_llm(tool_result_str=tool_result_str)
async def _execute_column_iteration(self):
base_command = self.current_command
tool_name = base_command.get("tool_name")
base_args = base_command.get("arguments", base_command.get("parameters", {}))
db_name = base_args.get("db_name")
table_name = base_args.get("table_name")
specific_column = base_args.get("col_name") or base_args.get("column_name")
if specific_column:
yield _format_sse({"step": "Tool Execution Intent", "details": base_command}, "tool_result")
col_result = await invoke_mcp_tool(base_command)
if isinstance(col_result, dict) and col_result.get("error") == "parameter_mismatch":
yield _format_sse({"details": col_result}, "request_user_input")
self.state = AgentState.ERROR
return
yield _format_sse({"step": f"Tool Execution Result for column: {specific_column}", "details": col_result, "tool_name": tool_name}, "tool_result")
self.collected_data.append(col_result)
tool_result_str = json.dumps({"tool_name": tool_name, "tool_output": col_result})
yield _format_sse({"step": "Thinking about the next action...", "details": "Single column execution complete. Resuming main plan."})
await self._get_next_action_from_llm(tool_result_str=tool_result_str)
return
yield _format_sse({"step": f"Column tool detected: {tool_name}", "details": "Fetching column list to begin iteration."})
cols_command = {"tool_name": "base_columnDescription", "arguments": {"db_name": db_name, "obj_name": table_name}}
cols_result = await invoke_mcp_tool(cols_command)
if not (cols_result and isinstance(cols_result, dict) and cols_result.get('status') == 'success' and cols_result.get('results')):
raise ValueError(f"Failed to retrieve column list for iteration. Response: {cols_result}")
all_columns = cols_result.get('results', [])
columns_to_iterate = all_columns
all_column_results = []
for column_info in columns_to_iterate:
col_name = column_info.get("ColumnName")
iter_args = base_args.copy()
iter_args['col_name'] = col_name
if db_name and table_name and '.' not in table_name:
iter_args["table_name"] = f"{db_name}.{table_name}"
if 'db_name' in iter_args: del iter_args["db_name"]
iter_command = {"tool_name": tool_name, "arguments": iter_args}
yield _format_sse({"step": "Tool Execution Intent", "details": iter_command}, "tool_result")
col_result = await invoke_mcp_tool(iter_command)
if isinstance(col_result, dict) and col_result.get("error") == "parameter_mismatch":
yield _format_sse({"details": col_result}, "request_user_input")
self.state = AgentState.ERROR
return # Stop the entire iteration if one column fails validation
yield _format_sse({"step": f"Tool Execution Result for column: {col_name}", "details": col_result, "tool_name": tool_name}, "tool_result")
all_column_results.append(col_result)
if self.iteration_context:
ctx = self.iteration_context
current_item_name = ctx["items"][ctx["item_index"]]
ctx["results_per_item"][current_item_name].append(all_column_results)
ctx["action_count_for_item"] += 1
else:
self.collected_data.append(all_column_results)
tool_result_str = json.dumps({"tool_name": tool_name, "tool_output": all_column_results})
yield _format_sse({"step": "Thinking about the next action...", "details": "Column iteration complete. Resuming main plan."})
await self._get_next_action_from_llm(tool_result_str=tool_result_str)
async def _get_next_action_from_llm(self, tool_result_str: str | None = None):
prompt_for_next_step = ""
if self.active_prompt_plan:
app.logger.info("Applying generic plan-aware reasoning for next step.")
last_tool_name = self.current_command.get("tool_name") if self.current_command else "N/A"
prompt_for_next_step = (
"You are executing a multi-step plan. Your goal is to follow it precisely.\n\n"
f"--- ORIGINAL PLAN ---\n{self.active_prompt_plan}\n\n"
"--- CURRENT STATE ---\n"
f"- The last action was the execution of the tool `{last_tool_name}`.\n"
"- The result of this tool call is now in the conversation history.\n\n"
"--- YOUR TASK ---\n"
"1. **Analyze the ORIGINAL PLAN.** Determine which phase of the plan you have just completed.\n"
"2. **Determine the NEXT STEP.** Based on the plan, what is the very next action you must take?\n"
" - If the plan says the next step is to call another tool, provide the JSON for that tool call.\n"
" - If the plan says the next step is to analyze the previous results and provide a final answer, your response **MUST** start with `FINAL_ANSWER:`. Do not call any more tools.\n"
)
elif self.iteration_context:
ctx = self.iteration_context
current_item_name = ctx["items"][ctx["item_index"]]
last_tool_failed = tool_result_str and '"error":' in tool_result_str.lower()
if last_tool_failed:
prompt_for_next_step = (
"The last tool call failed. Analyze the error message in the history.\n"
f"You are still working on item: **`{current_item_name}`**.\n"
"Based on the error, is the tool incompatible with the parameters (e.g., wrong data type)?\n"
"- If it is an incompatibility issue, acknowledge the error and **skip this step**. Determine the next logical step in the **ORIGINAL PLAN** for the current item.\n"
"- If it is a different kind of error, try to correct it. If you cannot, ask for help.\n"
f"--- ORIGINAL PLAN ---\n{self.active_prompt_plan}\n\n"
)
else:
last_tool_table = self.current_command.get("arguments", {}).get("table_name", "")
if last_tool_table and last_tool_table != current_item_name:
try:
new_index = ctx["items"].index(last_tool_table)
ctx["item_index"] = new_index
ctx["action_count_for_item"] = 1
current_item_name = ctx["items"][ctx["item_index"]]
except ValueError:
pass
if ctx["action_count_for_item"] >= 4:
ctx["item_index"] += 1
ctx["action_count_for_item"] = 0
if ctx["item_index"] >= len(ctx["items"]):
self.iteration_context = None
prompt_for_next_step = (
"You have successfully completed all steps for all items in the iterative phase of the plan. "
"All results are now in the conversation history. Your next and final task is to proceed to the next major phase of the **ORIGINAL PLAN** (Phase 3). "
"This final phase requires you to synthesize all the information you have gathered into a comprehensive dashboard or report. "
"This is a text generation task. Do not call any more tools. "
"Your response **MUST** start with `FINAL_ANSWER:`."
)
else:
current_item_name = ctx["items"][ctx["item_index"]]
prompt_for_next_step = (
f"You have finished all steps for the previous item. Now, begin Phase 2 for the **next item**: `{current_item_name}`. "
"According to the original plan, what is the first step for this new item?"
)
else:
prompt_for_next_step = (
"You are executing a multi-step, iterative plan.\n\n"
f"--- ORIGINAL PLAN ---\n{self.active_prompt_plan}\n\n"
f"--- CURRENT FOCUS ---\n"
f"- You are working on item: **`{current_item_name}`**.\n"
f"- You have taken {ctx['action_count_for_item']} action(s) for this item so far.\n"
"- The result of the last action is in the conversation history.\n\n"
"--- YOUR NEXT TASK ---\n"
"1. Look at the **ORIGINAL PLAN** to see what the next step in the sequence is for the current item (`{current_item_name}`).\n"
"2. Execute the correct next step. Provide a tool call in a `json` block or perform the required text generation."
)
else:
prompt_for_next_step = (
"You have just received data from a tool call. Review the data and your instructions to decide the next step.\n\n"
"1. **Consider a Chart:** First, review the `--- Charting Rules ---` in your system prompt. Based on the data you just received, would a chart be an appropriate and helpful way to visualize the information for the user?\n\n"
"2. **Choose Your Action:**\n"
" - If a chart is appropriate, your next action is to call the correct chart-generation tool. Respond with only the `Thought:` and ```json...``` block for that tool.\n"
" - If a chart is **not** appropriate and you have all the information needed to answer the user's request, you should provide the final answer. Your response **MUST** start with `FINAL_ANSWER:`.\n"
" - If you still need more information from other tools, call the next appropriate tool."
)
if tool_result_str:
final_prompt_to_llm = f"{prompt_for_next_step}\n\nThe last tool execution returned the following result. Use this to inform your next action:\n\n{tool_result_str}"
else:
final_prompt_to_llm = prompt_for_next_step
self.next_action_str = await call_llm_api(prompt=final_prompt_to_llm, session_id=self.session_id)
if not self.next_action_str: raise ValueError("LLM failed to provide a response.")
self.state = AgentState.DECIDING
async def _handle_summarizing(self):
llm_response = self.next_action_str
summary_text = ""
final_answer_match = re.search(r'FINAL_ANSWER:(.*)', llm_response, re.DOTALL | re.IGNORECASE)
if final_answer_match:
summary_text = final_answer_match.group(1).strip()
thought_process = llm_response.split(final_answer_match.group(0))[0]
yield _format_sse({"step": "Agent finished execution", "details": thought_process}, "llm_thought")
else:
yield _format_sse({"step": "Plan finished, generating final summary...", "details": "The agent is synthesizing all collected data."})
final_prompt = (
"You have executed a multi-step plan. All results are in the history. "
f"Your final task is to synthesize this information into a comprehensive, natural language answer for the user's original request: '{self.original_user_input}'. "
"Your response MUST start with `FINAL_ANSWER:`.\n\n"
"**CRITICAL INSTRUCTIONS:**\n"
"1. Provide a concise, user-focused summary in plain text or simple markdown.\n"
"2. **DO NOT** include raw data, SQL code, or complex tables in your summary. The system will format and append this data automatically.\n"
"3. Do not describe your internal thought process."
)
final_llm_response = await call_llm_api(prompt=final_prompt, session_id=self.session_id)
final_answer_match_inner = re.search(r'FINAL_ANSWER:(.*)', final_llm_response or "", re.DOTALL | re.IGNORECASE)
if final_answer_match_inner:
summary_text = final_answer_match_inner.group(1).strip()
else:
summary_text = final_llm_response or "The agent finished its plan but did not provide a final summary."
formatter = OutputFormatter(llm_summary_text=summary_text, collected_data=self.collected_data)
final_html = formatter.render()
SESSIONS[self.session_id]['history'].append({'role': 'assistant', 'content': final_html})
yield _format_sse({"final_answer": final_html}, "final_answer")
self.state = AgentState.DONE
# --- Web Server Routes ---
@app.route("/")
async def index():
return await render_template("index.html")
@app.route("/app-config")
async def get_app_config():
return jsonify({
"all_models_unlocked": APP_CONFIG.ALL_MODELS_UNLOCKED,
"charting_enabled": APP_CONFIG.CHARTING_ENABLED
})
@app.route("/tools")
async def get_tools():
if not mcp_client: return jsonify({"error": "Not configured"}), 400
return jsonify(structured_tools)
@app.route("/prompts")
async def get_prompts():
if not mcp_client: return jsonify({"error": "Not configured"}), 400
return jsonify(structured_prompts)
@app.route("/resources")
async def get_resources_route():
if not mcp_client: return jsonify({"error": "Not configured"}), 400
return jsonify(structured_resources)
@app.route("/charts")
async def get_charts():
if not APP_CONFIG.CHART_MCP_CONNECTED: return jsonify({"error": "Chart server not connected"}), 400
return jsonify(structured_charts)
@app.route("/sessions", methods=["GET"])
async def get_sessions():
session_summaries = [
{"id": sid, "name": s_data["name"], "created_at": s_data["created_at"]}
for sid, s_data in SESSIONS.items()
]
session_summaries.sort(key=lambda x: x["created_at"], reverse=True)
return jsonify(session_summaries)
@app.route("/session/<session_id>", methods=["GET"])
async def get_session_history(session_id):
if session_id in SESSIONS:
return jsonify(SESSIONS[session_id]["history"])
return jsonify({"error": "Session not found"}), 404
def get_full_system_prompt(base_prompt_text, charting_intensity_val):
global tools_context, prompts_context, charts_context
chart_instructions = CHARTING_INSTRUCTIONS.get(charting_intensity_val, CHARTING_INSTRUCTIONS['none'])