Springboot WebClient Flux 流式接入 DeepSeek

Springboot WebClient Flux 流式接入 DeepSeek

package cn.netkiller.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;

import java.util.List;
import java.util.Map;

@Service
@Slf4j
public class AigcService {
    private final Gson gson = new Gson();

    @Value("${deepseek.url}")
    private String url;
    @Value("${deepseek.authorization}")
    private String authorization;
    @Value("${deepseek.model}")
    private String model;


    public Flux<String> deepseek(String device, String taskId, String speakId, List<Map<String, String>> messages) {

        WebClient webClient = WebClient.builder()
                .baseUrl(url)
                .defaultHeader("Content-Type", "application/json")
                .defaultHeader("Authorization", "Bearer " + authorization)
                .build();

//        List<Map<String, String>> messages = new ArrayList<Map<String, String>>();
//        messages.add(Map.of("role", "user", "content", content));


        Map<String, Object> requestBody = Map.of(
                "model", model,
                "messages", messages,
                "stream", true
        );

        return webClient.post()
                .uri("/chat/completions")
                .bodyValue(requestBody)
                .accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve()
                .bodyToFlux(String.class)
                .filter(chunk -> !chunk.trim().isEmpty())
                .map(chunk -> {
                    if (chunk.trim().equals("data: [DONE]")) {
                        return "[END]";
                    }
                    try {
                        String jsonStr = chunk.replaceFirst("^data: ", "");
                        JsonNode jsonNode = new ObjectMapper().readTree(jsonStr);
                        String result = jsonNode.path("choices").path(0).path("delta").path("content").asText();
                        return result != null ? result : "";
                    } catch (JsonProcessingException e) {
                        return "";
                    }
                })
                .filter(text -> text != null && !text.equals("null") && !text.isEmpty())
                .onErrorResume(e -> {
                    log.error("Error processing stream", e);
                    return Flux.just("[ERROR]");
                });
    }


}

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

netkiller-BG7NYT

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值