Springboot WebClient Flux 流式接入 DeepSeek

package cn.netkiller.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import java.util.List;
import java.util.Map;
@Service
@Slf4j
public class AigcService {
private final Gson gson = new Gson();
@Value("${deepseek.url}")
private String url;
@Value("${deepseek.authorization}")
private String authorization;
@Value("${deepseek.model}")
private String model;
public Flux<String> deepseek(String device, String taskId, String speakId, List<Map<String, String>> messages) {
WebClient webClient = WebClient.builder()
.baseUrl(url)
.defaultHeader("Content-Type", "application/json")
.defaultHeader("Authorization", "Bearer " + authorization)
.build();
// List<Map<String, String>> messages = new ArrayList<Map<String, String>>();
// messages.add(Map.of("role", "user", "content", content));
Map<String, Object> requestBody = Map.of(
"model", model,
"messages", messages,
"stream", true
);
return webClient.post()
.uri("/chat/completions")
.bodyValue(requestBody)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
.filter(chunk -> !chunk.trim().isEmpty())
.map(chunk -> {
if (chunk.trim().equals("data: [DONE]")) {
return "[END]";
}
try {
String jsonStr = chunk.replaceFirst("^data: ", "");
JsonNode jsonNode = new ObjectMapper().readTree(jsonStr);
String result = jsonNode.path("choices").path(0).path("delta").path("content").asText();
return result != null ? result : "";
} catch (JsonProcessingException e) {
return "";
}
})
.filter(text -> text != null && !text.equals("null") && !text.isEmpty())
.onErrorResume(e -> {
log.error("Error processing stream", e);
return Flux.just("[ERROR]");
});
}
}