侧边栏壁纸
博主头像
牧云

怀璧慎显,博识谨言。

  • 累计撰写 133 篇文章
  • 累计创建 13 个标签
  • 累计收到 8 条评论

目 录CONTENT

文章目录

联网搜索

秋之牧云
2026-03-17 / 0 评论 / 0 点赞 / 0 阅读 / 0 字

1. SearXNG 搜索引擎

1.1. 联网搜索处理流程

和 RAG 增强检索类似,联网搜索改为从搜索引擎获取最新的数据,然后将最新数据与提示词组合,一起发送给 AI 大模型,最终给出回答

1.2. SearXNG简介

SearXNG 是一款注重隐私保护的开源元搜索引擎,它本身不维护搜索引擎数据库,而是通过聚合多个搜索引擎(如 Google、Bing 等)的结果,整合结果后去重排序返回。同时,它为用户提供匿名、无广告的搜索体验。

1.3. 支持 API 调用

除了通过图形化界面来操作 SearXNG,还可以使用 API 调用的方式。前提需要修改 /config 目录下的 settings.yml 配置文件,在 formats 节点下,添加以 json 格式进行返参配置项

2. 整合 OKHttp3:获取 SearXNG 搜索结果

2.1. OKHttp 介绍

OKHttp 是由 Square 公司开发的开源 HTTP 客户端库,专为 Java 和 Android 应用程序设计。它提供了高效、可靠的 HTTP 通信能力,已成为 Android 开发的事实标准,并在 Java 后端服务中广泛使用。

2.2. 依赖

!-- OKHttp-->
<dependency>
    <groupId>com.squareup.okhttp3</groupId>
    <artifactId>okhttp</artifactId>
    <version>${okhttp.version}</version>
</dependency>

2.3. 配置

okhttp: # OkHttp 客户端配置
  connect-timeout: 5000    # 建立连接的最大等待时间(毫秒)
  read-timeout: 30000      # 读取数据的最大等待时间(毫秒)
  write-timeout: 15000     # 写入数据的最大等待时间(毫秒)
  max-idle-connections: 200 # 连接池中保持的最大空闲连接数
  keep-alive-duration: 5   # 空闲连接在连接池中的存活时间(分钟)

searxng: # SearXNG 搜索引擎配置
  url: http://localhost:8888/search # SearXNG 服务的 API 端点地址
  count: 50                # 每次从搜索结果中,提取的最大数量
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.TimeUnit;

/**
 * @Description: OkHttp 客户端配置类
 **/
@Configuration
public class OkHttpConfig {

    @Bean
    public OkHttpClient okHttpClient(
            @Value("${okhttp.connect-timeout}") int connectTimeout,
            @Value("${okhttp.read-timeout}") int readTimeout,
            @Value("${okhttp.write-timeout}") int writeTimeout,
            @Value("${okhttp.max-idle-connections}") int maxIdleConnections,
            @Value("${okhttp.keep-alive-duration}") int keepAliveDuration) {


        return new OkHttpClient.Builder()
                .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS)
                .readTimeout(readTimeout, TimeUnit.MILLISECONDS)
                .writeTimeout(writeTimeout, TimeUnit.MILLISECONDS)
                .connectionPool(new ConnectionPool(maxIdleConnections, keepAliveDuration, TimeUnit.MINUTES))
                .build();
    }

}

2.4. 搜索结果实体类

content 字段,需要访问各自的页面链接,拿到页面内容后,再设置 content 字段的值

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SearchResult {

    /**
     * 页面访问链接
     */
    private String url;

    /**
     * 相关性评分
     */
    private Double score;

    /**
     * 页面内容
     */
    private String content;
}

2.5. service 业务层

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.quanxiaoha.vector.store.model.SearchResult;
import com.quanxiaoha.vector.store.service.SearXNGService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

@Service
@Slf4j
public class SearXNGServiceImpl implements SearXNGService {

    @Resource
    private OkHttpClient  okHttpClient;
    @Resource
    private ObjectMapper objectMapper;
    @Value("${searxng.url}")
    private String searxngUrl;
    @Value("${searxng.count}")
    private int count;

    @Override
    public List<SearchResult> search(String query) {
        // 构建 SearXNG API 请求 URL
        HttpUrl httpUrl = HttpUrl.parse(searxngUrl).newBuilder()
                .addQueryParameter("q", query) // 设置搜索关键词
                .addQueryParameter("format", "json") // 指定返回 JSON 格式
				.addQueryParameter("engines", "bing,quark,sogou,360search") // 指定聚合的目标搜索引擎(配置本地网络能够访问的通的搜索引擎)
                .build();

        // 创建 HTTP GET 请求
        Request request = new Request.Builder()
                        .url(httpUrl)
                        .get()
                        .build();

        // 发送 HTTP 请求
        try (Response response = okHttpClient.newCall(request).execute()) {
            // 判断请求是否成功
            if (response.isSuccessful()) {
                // 拿到返回结果
                String result = response.body().string();
                log.info("## SearXNG 搜索结果: {}", result);

                // 解析 JSON 响应
                JsonNode root = objectMapper.readTree(result);
                JsonNode results = root.get("results"); // 获取结果数组节点

                // 定义 Record 类型:用于临时存储评分和节点引用
                record NodeWithUrlAndScore(double score, JsonNode node) {}

                // 处理搜索结果流:
                // 1. 提取评分
                // 2. 按评分降序排序
                // 3. 限制返回结果数量 (比如只提取评分最高的 50 条搜索结果)
                List<NodeWithUrlAndScore> nodesWithScore = StreamSupport.stream(results.spliterator(), false)
                        .map(node -> {
                            // 只提取分数,避免构建完整对象
                            double score = node.path("score").asDouble(0.0); // 提取评分
                            return new NodeWithUrlAndScore(score, node);
                        })
                        .sorted(Comparator.comparingDouble(NodeWithUrlAndScore::score).reversed()) // 按评分降序
                        .limit(count) // 限制返回结果数量
                        .toList();

                // 转换为 SearchResult 对象集合
                return nodesWithScore.stream()
                        .map(n -> {
                            JsonNode node = n.node();
                            String originalUrl = node.path("url").asText(""); // 提取 URL
                            return SearchResult.builder()
                                    .url(originalUrl)
                                    .score(n.score()) // 保留评分
                                    .build();
                        })
                        .collect(Collectors.toList());
            }
        } catch (Exception e) {
            log.error("", e);
        }
        // 返回空集合
        return Collections.emptyList() ;
    }

}

2.6. 新增 controller 控制器

import com.quanxiaoha.vector.store.model.SearchResult;
import com.quanxiaoha.vector.store.service.SearXNGService;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;


/**
 * @Description: 联网搜索
 **/
@RestController
@RequestMapping("/network")
public class NetworkSearchController {

    @Resource
    private SearXNGService searXNGService;

    /**
     * 测试
     * @param message
     * @return
     */
    @GetMapping(value = "/test")
    public List<SearchResult> generateStream(@RequestParam(value = "message") String message) {
        // 调用 SearXNG 获取搜索结果
        List<SearchResult> searchResults = searXNGService.search(message);

        return searchResults;
    }

}

3. 编写 “联网搜索” 提示词模板

有结果页面的 url 访问链接后,就可以按相关性评分降序排列,获取评分最高的部分页面 url(比如 49 个网页), 并发发送请求,再拿到页面的完整 html 内容,将它们作为上下文,组合成 “增强提示词”,提交给 AI 大模型来回答。

3.1. 处理流程

https://img.quanxiaoha.com/quanxiaoha/175395538914846

  1. 提取从 SearXNG 获取到的,与用户问题匹配度较高的页面 URL, 比如 49 个(可配置);

  2. 并发去请求这些 URL, 获取页面完整的 html 代码;

  3. 提取 html 代码中的文本;

  4. 拼装所有页面的内容,充当上下文,组合成 “增强提示词”;

  5. 调用 AI 大模型,让其基于上下文信息,来回答用户问题;

3.2. 提示词模板

## 用户问题
{question}

## 上下文
上下文信息如下,由以下符号包围:
---------------------
{context}
---------------------

请根据上下文内容来回复用户:

## 任务要求

1. 综合分析上下文内容,提取与用户问题直接相关的核心信息
2. 特别关注匹配度较高的结果
3. 对矛盾信息进行交叉验证,优先采用多个来源证实的信息
4. 请避免使用诸如 “根据上下文……” 或 “所提供的信息……” 这类表述
5. 当上下文内容不足或存在知识缺口时,再考虑使用本身已拥有的先验知识

4. 自定义线程池:通过 CompletableFuture 并发获取搜索结果页面内容

4.1. 自定义线程池

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @Description: 自定义线程池
 **/
@Configuration
public class ThreadPoolConfig {

    /**
     * HTTP 请求线程池(IO 密集型任务)
     * @return
     */
    @Bean("httpRequestExecutor")
    public ThreadPoolTaskExecutor httpRequestExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(50); // 核心线程数(保持常驻)
        executor.setMaxPoolSize(200); // 最大线程数(突发流量时扩容)
        executor.setQueueCapacity(1000); // 任务队列容量(缓冲突发请求)
        executor.setKeepAliveSeconds(120); // 空闲线程存活时间(秒)
        executor.setThreadNamePrefix("http-fetcher-"); // 线程名前缀(便于监控)
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略(由调用线程执行)
        executor.initialize(); // 初始化线程池
        return executor;
    }

    /**
     * 结果处理线程池(CPU 密集型任务)
     * @return
     */
    @Bean("resultProcessingExecutor")
    public ThreadPoolTaskExecutor resultProcessingExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); // 核心线程数(等于CPU核心数)
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2); // 最大线程数(不超过CPU核心数2倍)
        executor.setQueueCapacity(200); // 较小队列(避免任务堆积)
        executor.setThreadNamePrefix("result-processor-"); // 线程名前缀(便于监控)
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略(直接抛出异常)
        executor.initialize(); // 初始化线程池
        return executor;
    }
}

  • httpRequestExecutor : 此线程池用于发送 Http 请求,任务场景属于 IO 密集型,因为 Http 请求涉及网络等待(延迟较高),线程大部分时间处于等待状态,所以设置了较多的线程数,能够提升并发处理性能;

  • resultProcessingExecutor : 此线程池用于提取每个请求的结果,任务场景属于是 CPU 密集型,所以,线程数定义的较少,防止 CPU 调度时,上下文切换带来的性能损耗;

4.2. 定义 service 批量获取

package com.quanxiaoha.vector.store.service.impl;

import com.quanxiaoha.vector.store.model.SearchResult;
import com.quanxiaoha.vector.store.service.SearchResultContentFetcherService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.jsoup.Jsoup;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @Author: 犬小哈
 * @Date: 2025/7/30 12:15
 * @Version: v1.0.0
 * @Description: 页面内容提取
 **/
@Service
@Slf4j
public class SearchResultContentFetcherServiceImpl implements SearchResultContentFetcherService {

    @Resource(name = "resultProcessingExecutor")
    private ThreadPoolTaskExecutor processingExecutor;

    @Resource(name = "httpRequestExecutor")
    private ThreadPoolTaskExecutor httpExecutor;
    
    @Resource
    private OkHttpClient okHttpClient;

    /**
     * 并发批量获取搜索结果页面的内容
     *
     * @param searchResults
     * @param timeout 超时时间
     * @param unit 单位
     * @return
     */
    @Override
    public CompletableFuture<List<SearchResult>> batchFetch(List<SearchResult> searchResults, long timeout, TimeUnit unit) {
        // 步骤1:为每个搜索结果创建独立的异步获取任务
        List<CompletableFuture<SearchResult>> futures = searchResults.stream()
                .map(result -> asynFetchContentForResult(result, timeout, unit))
                .toList();

        // 步骤2:合并所有独立任务为一个聚合任务
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(
                futures.toArray(new CompletableFuture[0])
        );

        // 步骤3:当所有任务完成后收集结果
        return allFutures.thenApplyAsync(v -> // 所有任务完成后触发
                        futures.stream() // 遍历所有已完成的任务
                                .map(CompletableFuture::join) // 提取每个任务的结果
                                .collect(Collectors.toList()), // 合并所有结果为一个集合,并返回
                processingExecutor // 使用专用的 processingExecutor 线程池
        );
    }

    /**
     * 异步获取单个 SearchResult 对象对应的页面内容
     * @param result
     * @param timeout 超时时间
     * @param unit 时间单位
     * @return
     */
    private CompletableFuture<SearchResult> asynFetchContentForResult(
            SearchResult result,
            long timeout,
            TimeUnit unit) {

        // 异步线程处理
        return CompletableFuture.supplyAsync(() -> {
                    // 获取 HTML 内容
                    String html = syncFetchHtmlContent(result.getUrl());

                    return SearchResult.builder()
                            .url(result.getUrl())
                            .score(result.getScore())
                            .content(html)
                            .build();

                }, httpExecutor) // 使用专用的 httpExecutor 线程池
                // 超时处理
                .completeOnTimeout(createFallbackResult(result), timeout, unit)
                // 异常处理
                .exceptionally(e -> {
                    // 记录错误日志
                    log.error("## 获取页面内容异常, URL: {}", result.getUrl(), e);
                    return createFallbackResult(result);
                });
    }

    /**
     * 创建回退结果(请求失败时使用)
     */
    private SearchResult createFallbackResult(SearchResult searchResult) {
        return SearchResult.builder()
                .url(searchResult.getUrl())
                .score(searchResult.getScore())
                .content("") // 空字符串表示获取页面内容失败
                .build();
    }
}

5. Jsoup 提取网页纯文本:降低提示词 Token 消耗

5.1. Jsoup 介绍

Jsoup 是一个强大的 Java HTML 解析、操作、清理和数据抓取库。专为处理 “现实世界” 的 HTML(即使格式混乱)而设计。

5.2. 依赖

<!-- Jsoup -->
<dependency>
    <groupId>org.jsoup</groupId>
    <artifactId>jsoup</artifactId>
    <version>${jsoup.version}</version>
</dependency>

5.3. 提取 HTML 中的纯文本

// 步骤3:当所有任务完成后收集结果
return allFutures.thenApplyAsync(v -> // 所有任务完成后触发
                futures.stream() // 遍历所有已完成的任务
                        .map(future -> {
                            SearchResult searchResult = future.join();
                            // 获取页面 HTML 代码
                            String html = searchResult.getContent();

                            if (StringUtils.isNotBlank(html)) {
                                // 提取 HTML 中的文本
                                searchResult.setContent(Jsoup.parse(html).text());
                            }

                            return searchResult;
                        }) // 提取每个任务的结果
                        .collect(Collectors.toList()), // 合并所有结果为一个集合,并返回
        processingExecutor // 使用专用的 processingExecutor 线程池
);

5.4. 增加聚合的搜索引擎

超时、反爬虫、异步渲染、请求异常等拿不到网页内容

可以增加提取搜索结果的最大数量,如 50 页,以免样本数太少

// 构建 SearXNG API 请求 URL
HttpUrl httpUrl = HttpUrl.parse(searxngUrl).newBuilder()
        .addQueryParameter("q", query) // 设置搜索关键词
        .addQueryParameter("format", "json") // 指定返回 JSON 格式
        .addQueryParameter("engines", "wolframalpha,presearch,seznam,mwmbl,encyclosearch,bpb,mojeek,right dao,wikimini,crowdview,searchmysite,bing,naver,360search") // 指定聚合的目标搜索引擎(配置本地网络能够访问的通的搜索引擎)
        .build();

6. 自定义 “联网搜索” Advisor

自定义一个 Advisor, 再调用 AI 大模型之前,实现 “提示词增强”,一并提交给 AI 大模型,将 “联网搜索” 功能的整体流程串起来。

6.1. 自定义 Advisor

import com.quanxiaoha.vector.store.model.SearchResult;
import com.quanxiaoha.vector.store.service.SearXNGService;
import com.quanxiaoha.vector.store.service.SearchResultContentFetcherService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.ai.chat.client.ChatClientRequest;
import org.springframework.ai.chat.client.ChatClientResponse;
import org.springframework.ai.chat.client.advisor.api.StreamAdvisor;
import org.springframework.ai.chat.client.advisor.api.StreamAdvisorChain;
import org.springframework.ai.chat.messages.UserMessage;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.ai.chat.prompt.PromptTemplate;
import reactor.core.publisher.Flux;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @Description: 联网搜索 Advisor
 **/
@Slf4j
public class NetworkSearchAdvisor implements StreamAdvisor {
    
    private final SearXNGService searXNGService;
    private final SearchResultContentFetcherService searchResultContentFetcherService;

    /**
     * 联网搜索提示词模板
     */
    private static final PromptTemplate DEFAULT_PROMPT_TEMPLATE = new PromptTemplate("""
            ## 用户问题
            {question}
            
            ## 上下文
            上下文信息如下,由以下符号包围:
            ---------------------
            {context}
            ---------------------
            
            请根据上下文内容来回复用户:
            
            ## 任务要求
            
            1. 综合分析上下文内容,提取与用户问题直接相关的核心信息
            2. 特别关注匹配度较高的结果
            3. 对矛盾信息进行交叉验证,优先采用多个来源证实的信息
            4. 请避免使用诸如 “根据上下文……” 或 “所提供的信息……” 这类表述
            5. 当上下文内容不足或存在知识缺口时,再考虑使用本身已拥有的先验知识
            """);

    public NetworkSearchAdvisor(SearXNGService searXNGService,  SearchResultContentFetcherService searchResultContentFetcherService) {
        this.searXNGService = searXNGService;
        this.searchResultContentFetcherService = searchResultContentFetcherService;
    }

    @Override
    public Flux<ChatClientResponse> adviseStream(ChatClientRequest chatClientRequest, StreamAdvisorChain streamAdvisorChain) {
        // 获取用户输入的提示词
        Prompt prompt = chatClientRequest.prompt();
        UserMessage userMessage = prompt.getUserMessage();

        // 调用 SearXNG 获取搜索结果
        List<SearchResult> searchResults = searXNGService.search(userMessage.getText());

        // 并发请求,获取搜索结果页面的内容
        CompletableFuture<List<SearchResult>> resultsFuture = searchResultContentFetcherService.batchFetch(searchResults, 7, TimeUnit.SECONDS);

        List<SearchResult> results = resultsFuture.join();

        // 过滤掉获取失败的结果
        List<SearchResult> successfulResults = results.stream()
                .filter(r -> StringUtils.isNotBlank(r.getContent()))
                .toList();

        // 构建搜索结果上下文信息
        String searchContext = buildContext(successfulResults);

        // 填充提示词占位符,转换为 Prompt 提示词对象
        Prompt newPrompt = DEFAULT_PROMPT_TEMPLATE.create(Map.of("question", userMessage.getText(),
                "context", searchContext));

        log.info("## 重新构建的增强提示词: {}", newPrompt.getUserMessage().getText());

        // 重新构建 ChatClientRequest,设置重新构建的 “增强提示词”
        ChatClientRequest newChatClientRequest = ChatClientRequest.builder().prompt(newPrompt).build();

        return streamAdvisorChain.nextStream(newChatClientRequest);
    }

    /**
     * 构建上下文
     * @param successfulResults
     * @return
     */
    private String buildContext(List<SearchResult> successfulResults) {
        int i = 1;
        StringBuilder contextTemp = new StringBuilder();
        for (SearchResult searchResult : successfulResults) {
            contextTemp.append(String.format("""
                        ### 来源 %s | 相关性: %s
                        - URL: %s 
                        - 页面文本:
                        %s
                        \n
                        """, i, searchResult.getScore(), searchResult.getUrl(), searchResult.getContent()));
            i++;
        }

        return contextTemp.toString();
    }

    @Override
    public String getName() {
        // 获取类名称
        return this.getClass().getSimpleName();
    }

    @Override
    public int getOrder() {
        return 1; // order 值越小,越先执行
    }
}

/**
 * 流式对话
 * @param message
 * @return
 */
@GetMapping(value = "/chat", produces = "text/html;charset=utf-8")
public Flux<String> chat(@RequestParam(value = "message") String message) {

    // 流式输出
    return chatClient.prompt()
            .user(message) // 提示词
            .advisors(new NetworkSearchAdvisor(searXNGService, searchResultContentFetcherService)) // 使用自定义的联网搜索 Advisor
            .stream()
            .content();
}

0

评论区