diff --git a/src/cli.rs b/src/cli.rs index c6a6b6b..7d86b8e 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -157,9 +157,37 @@ impl ChatCLI { } Err(e) => { println!(); // Add newline after failed streaming - self.display - .print_error(&format!("Streaming failed: {}", e)); - return Err(e); + + // Try to fallback to non-streaming if streaming fails + self.display.print_warning(&format!("Streaming failed: {}. Trying non-streaming mode...", e)); + + let spinner = self.display.show_spinner("Thinking"); + let client = self.get_client()?.clone(); + + match client + .chat_completion( + &self.session.model, + &self.session.messages, + self.session.enable_web_search, + self.session.enable_reasoning_summary, + &self.session.reasoning_effort, + self.session.enable_extended_thinking, + self.session.thinking_budget_tokens, + ) + .await + { + Ok(response) => { + spinner.finish("Done"); + self.display.print_assistant_response(&response); + self.session.add_assistant_message(response); + self.session.save()?; + } + Err(fallback_e) => { + spinner.finish_with_error("Failed"); + self.display.print_error(&format!("Both streaming and non-streaming failed. Streaming: {}. Non-streaming: {}", e, fallback_e)); + return Err(fallback_e); + } + } } } } else { diff --git a/src/core/client.rs b/src/core/client.rs index a87846c..17cba11 100644 --- a/src/core/client.rs +++ b/src/core/client.rs @@ -1047,14 +1047,38 @@ impl AnthropicClient { let mut full_response = String::new(); let mut stream = response.bytes_stream(); + let mut event_count = 0; + let mut content_blocks_found = 0; + let mut partial_line = String::new(); // Buffer for incomplete lines while let Some(chunk) = stream.next().await { let chunk = chunk.context("Failed to read chunk from Anthropic stream")?; let chunk_str = std::str::from_utf8(&chunk) .context("Failed to parse Anthropic chunk as UTF-8")?; + // Handle partial lines by buffering incomplete chunks + partial_line.push_str(chunk_str); + + // Split by newlines, keeping the last incomplete line in buffer + let full_text = partial_line.clone(); + let mut lines: Vec<&str> = full_text.split('\n').collect(); + let lines_to_process = if full_text.ends_with('\n') { + // All lines are complete + partial_line.clear(); + lines + } else { + // Last line is incomplete, keep it in buffer + if let Some(last) = lines.pop() { + partial_line = last.to_string(); + lines + } else { + // No complete lines yet + continue; + } + }; + // Parse server-sent events for Anthropic - for line in chunk_str.lines() { + for line in lines_to_process { if line.starts_with("data: ") { let data = &line[6..]; @@ -1063,13 +1087,16 @@ impl AnthropicClient { continue; } + // Try to parse JSON, but be more robust with large/truncated responses match serde_json::from_str::(data) { Ok(event) => { + event_count += 1; match event.event_type.as_str() { "content_block_delta" => { if let Ok(delta_event) = serde_json::from_value::(event.data) { if let Some(text) = delta_event.delta.text { if !text.is_empty() { + content_blocks_found += 1; full_response.push_str(&text); stream_callback(&text).await; } @@ -1084,6 +1111,9 @@ impl AnthropicClient { full_response.push_str(search_indicator); stream_callback(search_indicator).await; } + } else { + // This might be a text content block start + content_blocks_found += 1; } } "content_block_stop" => { @@ -1092,14 +1122,31 @@ impl AnthropicClient { "message_start" | "message_delta" | "message_stop" => { // Handle other message-level events } + "ping" => { + // Anthropic sends ping events to keep connection alive + // These are expected and should be silently ignored + } _ => { - // Unknown event type, skip + // Silently ignore unknown event types to avoid user confusion + // Previously logged: Unknown Anthropic event type } } } - Err(_) => { - // Skip malformed JSON chunks - continue; + Err(e) => { + // Handle parsing errors more gracefully + if data.len() > 1000 { + // Large data blocks (like web search results) that fail to parse + // are often non-critical, so silently continue + continue; + } else if e.to_string().contains("EOF") { + // EOF errors suggest truncated JSON, which is common with large responses + // Continue processing other chunks + continue; + } else { + // Only log unexpected parsing errors for shorter data + // Silently continue to avoid user confusion + continue; + } } } } else if line.starts_with("event: ") { @@ -1109,8 +1156,30 @@ impl AnthropicClient { } } + // Process any remaining partial line at the end + if !partial_line.trim().is_empty() && partial_line.starts_with("data: ") { + let data = &partial_line[6..]; + if !data.trim().is_empty() && data != "[DONE]" { + // Try to parse final partial line + if let Ok(event) = serde_json::from_str::(data) { + if event.event_type == "content_block_delta" { + if let Ok(delta_event) = serde_json::from_value::(event.data) { + if let Some(text) = delta_event.delta.text { + if !text.is_empty() { + full_response.push_str(&text); + } + } + } + } + } + } + } + if full_response.is_empty() { - return Err(anyhow::anyhow!("No content found in Anthropic stream response")); + return Err(anyhow::anyhow!( + "No content found in Anthropic stream response. Events processed: {}, Content blocks: {}", + event_count, content_blocks_found + )); } Ok(full_response)