生成式人工智能 (GenAI) 支持先進的人工智能用例和創新,但也改變了企業架構的外觀。大型語言模型 (LLM)、向量數據庫和檢索增強生成 (RAG) 需要新的數據集成模式和數據工程最佳實踐。使用 Apache Kafka 和 Apache Flink 進行數據流處理是大規模實時攝取和整理傳入數據集、連接各種數據庫和分析平臺以及解耦獨立業務部門和數據產品的關鍵。這篇博文探討了事件流與傳統請求響應 API 和數據庫之間的可能架構、示例以及權衡。

Apache Kafka 和 GenAI 的用例
生成式人工智能 (GenAI) 是用于自然語言處理 (NLP)、圖像生成、代碼優化和其他任務的下一代人工智能引擎。它可以幫助現實世界中的許多項目實現服務臺自動化、客戶與聊天機器人的對話、社交網絡中的內容審核以及許多其他用例。
Apache Kafka 成為這些機器學習平臺中的主要編排層,用于集成各種數據源、大規模處理和實時模型推理。
使用 Kafka 進行數據流傳輸已經為許多 GenAI 基礎設施和軟件產品提供支持。可能出現非常不同的情況:
- 數據流作為整個機器學習基礎設施的數據結構
- 模型評分,通過流處理進行實時預測和內容生成
- 生成帶有輸入文本、語音或圖像的流數據管道
- 大型語言模型的實時在線訓練
我在博客文章“Apache Kafka 作為 GenAI 的關鍵任務數據結構。”
下面介紹了大型語言模型 (LLM)、檢索增強生成 (RAG) 與向量數據庫和語義搜索以及 Apache Kafka 和 Flink 的數據流相結合的具體架構。
為什么生成式人工智能與傳統機器學習架構不同
機器學習 (ML) 允許計算機找到隱藏的見解,而無需對要查找的位置進行編程。這稱為模型訓練,是分析大數據集的批處理過程。輸出是一個二進制文件,即分析模型。
應用程序將這些模型應用于新的傳入事件以進行預測。這稱為模型評分,可以通過將模型嵌入到應用程序中或通過對模型服務器(部署模型)進行請求響應 API 調用來實時或批量進行。
但是,與傳統的機器學習流程相比,法學碩士和 GenAI 具有不同的要求和模式,正如我的前同事 Michael Drogalis 在兩個簡單、清晰的圖表中所解釋的那樣。
具有復雜數據工程的傳統預測機器學習
預測人工智能可以做出預測。專門構建的模型。線下培訓。這就是我們在過去十年左右的時間里進行機器學習的方式。
在傳統的機器學習中,大部分數據工程工作發生在模型創建時。特征工程和模型訓練需要大量的專業知識和努力:
新用例需要數據工程師和數據科學家構建新模型。
使用大型語言模型 (LLM) 的生成式 AI 實現 AI 民主化
生成人工智能 (GenAI) 創建內容。可重復使用的模型。情境學習。
但是對于大型語言模型,每次查詢都會發生數據工程。不同的應用程序重復使用相同的模型:
GenAI 用例的大型語言模型面臨的挑戰
大型語言模型 (LLM) 是可重用的。這使得人工智能的民主化成為可能,因為并非每個團隊都需要人工智能專業知識。相反,人工智能專業知識水平較低就足以使用現有的法學碩士。
但是,LLM 存在一些巨大的權衡:
- 昂貴的培訓:像 ChatGPT 這樣的法學碩士需要花費數百萬美元的計算資源(這不包括構建模型所需的專業知識)
- 靜態數據:法學碩士“被及時凍結”,這意味著模型沒有最新信息。
- 缺乏領域知識:法學碩士通常從公共數據集學習。因此,數據工程師抓取萬維網并將其輸入到模型訓練中。然而,企業需要在自己的環境中使用 LMM 來提供商業價值。
- 愚蠢:法學碩士不像人類那么聰明。例如,ChatGPT 甚至無法計算您提示的句子中的單詞數。
這些挑戰會產生所謂的幻覺……
避免幻覺以生成可靠的答案
幻覺,即最佳猜測答案,是結果,法學碩士不會告訴你這是編造的。幻覺是一種現象,其中人工智能模型生成的內容不基于真實的數據或信息,但創建完全虛構或不切實際的輸出。當生成模型(例如文本或圖像生成器)生成不連貫、不真實或與輸入數據或上下文不相關的內容時,可能會出現幻覺。這些幻覺可以表現為文本、圖像或其他類型的內容,這些內容看似合理,但完全是模型捏造的。
幻覺在生成人工智能中可能會出現問題,因為它們可能導致生成誤導性或虛假信息。
出于這些原因,生成式人工智能出現了一種新的設計模式:檢索增強生成(RAG)。讓我們首先看看這個新的最佳實踐,然后探討為什么使用 Apache Kafka 和 Flink 等技術進行數據流是 GenAI 企業架構的基本要求。
語義搜索和檢索增強生成 (RAG)
許多支持 GenAI 的應用程序都遵循檢索增強生成 (RAG) 的設計模式,將法學碩士與準確且最新的上下文相結合。 Pinecone(一個完全托管的矢量數據庫)背后的團隊有一個很好的解釋 使用此圖:
來源:松果
從較高層面來看,RAG 通常是兩個獨立的步驟。第一個是數據增強步驟,其中對不同的(通常是非結構化的)操作數據進行分塊,然后使用嵌入模型創建嵌入。嵌入被索引到向量數據庫中。矢量數據庫是一種工具,可讓語義搜索為不需要精確關鍵字匹配的提示找到相關上下文。
其次是推理步驟,GenAI 模型接收問題和上下文以生成可靠的答案(沒有幻覺)。 RAG 不會更新嵌入,而是檢索相關信息并與提示一起發送給 LLM。?
用于嵌入語義搜索的矢量數據庫
矢量數據庫,也稱為矢量存儲或矢量索引,是一種專門設計用于高效存儲和檢索矢量數據的數據庫。在這種情況下,矢量數據是指數值矢量的集合,它可以表示多種數據類型,例如文本、圖像、音頻或任何其他結構化或非結構化數據的嵌入。矢量數據庫在機器學習、數據檢索、推薦系統、相似性搜索等相關應用中非常有用。
矢量數據庫擅長執行相似性搜索,通常稱為語義搜索。他們可以根據各種相似性度量(例如余弦相似度或歐幾里德距離)快速找到與給定查詢向量相似或接近的向量。
矢量數據庫(不一定)不是一個單獨的數據庫類別。 Gradient Flow 解釋了其檢索增強生成的最佳實踐一個>:
<塊引用>
“矢量搜索不再局限于矢量數據庫。許多數據管理系統(包括 PostgreSQL)現在都支持矢量搜索。根據您的具體應用,您可能會找到滿足您特定需求的系統。接近實時或流優先嗎?檢查 Rockset 的產品。您已經在使用知識圖了嗎?Neo4j 對矢量搜索的支持意味著您的 RAG 結果將更容易解釋和可視化。”
塊引用>
對于另一個具體示例,請查看 MongoDB 的教程“使用 MongoDB 構建生成式 AI 應用程序:利用 Atlas 矢量搜索和開源模型的力量。”將 GenAI 用例的向量數據庫與 Apache Kafka 相結合有多種選擇。以下是事件驅動世界中可能的架構。
事件驅動架構:數據流 + Vector DB + LLM
事件驅動的應用程序可以更有效地實現檢索增強生成(RAG)、數據增強和模型推理這兩個步驟。使用 Apache Kafka 和 Apache Flink 的數據流可以實現任何規模的數據一致同步(如果應用程序或數據庫可以處理,則可以實時同步)和數據管理(= 流式 ETL)。
下圖顯示了利用事件驅動的數據流在整個 GenAI 管道中進行數據攝取和處理的企業架構:
此示例使用數據流將航班預訂和更改實時提取到 Kafka 的事件存儲中,以便稍后使用 GenAI 技術進行處理。 Flink 在調用嵌入模型為向量數據庫生成嵌入之前對數據進行預處理。與此同時,使用 Python 構建的實時客戶服務應用程序會消耗所有相關的上下文數據(例如,航班數據、客戶數據、嵌入等)來提示大型語言模型。法學碩士可以做出可靠的預測,例如為乘客重新預訂另一航班的建議。
在大多數企業場景中,出于安全和數據隱私原因,所有處理都在企業防火墻后面進行。法學碩士甚至可以與預訂引擎等交易系統集成,以執行重新預訂并將結果輸入相關應用程序和數據庫。
使用 API 進行請求-響應與事件驅動的數據流
在理想的世界中,一切都是基于事件的流數據。現實世界是不同的。因此,使用 HTTP/REST 或 SQL 進行請求-響應的 API 調用在企業架構的某些部分完全沒問題。由于 Kafka 真正解耦了系統,每個應用程序都可以選擇自己的通信范式和處理速度。因此,了解 HTTP/REST API 和 Apache Kafka 之間的權衡。
何時將請求-響應與 Apache Kafka 結合使用? — 此決定通常是根據延遲、解耦或安全性等權衡做出的。然而,對于大型法學碩士,情況發生了變化。由于法學碩士的培訓成本非常昂貴,因此現有法學碩士的可重用性至關重要。與使用決策樹、集群甚至小型神經網絡等算法構建的其他模型相反,將 LLM 嵌入到 Kafka Streams 或 Flink 應用程序中沒有什么意義。
同樣,增強模型通常通過 RPC/API 調用進行集成。通過將其嵌入到 Kafka Streams 微服務或 Flink 作業中,增強模型變得緊密耦合。如今專家主持了其中的許多活動,因為操作和優化它們并非易事。
托管 LLM 和增強模型的解決方案通常僅提供 HTTP 等 RPC 接口。這在未來可能會改變,因為請求-響應是流數據的反模式。 Seldon 是模型服務器演變的一個很好的例子。同時提供 Kafka 原生接口。在文章 使用 Kafka 原生模型部署進行流式機器學習。
法學碩士與企業其他部分之間的直接集成
在撰寫本文時,OpenAI 宣布了 GPT 來創建自定義版本ChatGPT 結合了說明、額外知識和任意技能組合。對于企業用途,最有趣的功能是開發人員可以將 OpenAI 的 GPT 連接到現實世界,即其他軟件應用程序、數據庫和云服務:
“除了使用我們的內置功能之外,您還可以通過為 GPT 提供一個或多個 API 來定義自定義操作。與插件一樣,操作允許 GPT 集成外部數據或與現實世界交互。連接 GPT數據庫、將它們插入電子郵件或讓它們成為您的購物助手。例如,您可以集成旅行列表數據庫、連接用戶的電子郵件收件箱或促進電子商務訂單。”
使用直接集成的權衡是緊密耦合和點對點通信。如果您已經使用 Kafka,您就會了解 具有真正解耦的領域驅動設計。
最后但并非最不重要的一點是,公共 GenAI API 和 LLM 的安全和治理策略薄弱。隨著人工智能數據的出現和點對點集成數量的增加,數據訪問、沿襲和安全挑戰不斷升級。
使用 Kafka、Flink 和 GenAI 進行數據流的實踐
在了解了很多理論之后,讓我們看一下將數據流與 GenAI 相結合的具體示例、演示和真實案例研究:
- 示例:Flink SQL + OpenAI API
- 演示:用于 RAG 和矢量搜索的 ChatGPT 4 + Confluence Cloud + MongoDB Atlas
- 成功案例:Elemental Cognition – 由 Confluence Cloud 提供支持的實時人工智能平臺
示例:Flink SQL + OpenAI API
使用 Kafka 和 Flink 進行流處理,實現實時數據和歷史數據的數據關聯。一個很好的例子,特別是對于生成式人工智能來說,是特定情境的客戶服務。我們在這里以航空公司和航班取消為例。
有狀態流處理器從 CRM、忠誠度平臺和其他應用程序獲取現有客戶信息,將其與客戶對聊天機器人的查詢相關聯,并對 LLM 進行 RPC 調用。
下圖使用 Apache Flink 和 Flink SQL 用戶定義函數 (UDF)。 SQL 查詢將預處理后的數據輸入 OpenAI API 以獲得可靠的答案。答案被發送到下游應用程序使用它的另一個 Kafka 主題,例如,用于重新預訂機票、更新忠誠度平臺,以及將數據存儲在數據湖中以供以后的批處理和分析。

演示:ChatGPT 4 + Confluence Cloud + 用于 RAG 和矢量搜索的 MongoDB Atlas
我的同事 Britton LaRoche 構建了一個出色的零售演示,展示了用于數據集成和處理的 Kafka 與用于存儲和語義向量搜索的 MongoDB 的結合。 AI視頻創作平臺D-ID通過視覺AI頭像代替命令行界面,讓演示更加美觀。

Confluence Cloud 和 MongoDB Atlas 的完全托管和深度集成服務使我們能夠專注于構建業務邏輯。
該架構與我上面的基于事件的流式傳輸示例不同。核心仍然是Kafka來真正解耦應用。大多數服務都是通過請求-響應 API 集成的。這很簡單、易于理解,而且通常足夠好。您可以稍后使用 Python Kafka API 輕松遷移到基于事件的模式、更改 Kafka 的數據捕獲 (CDC)、將 LangChain Python UDF 嵌入 Apache Flink 或使用 AsyncAPI 等異步接口。
下面是一個簡短的五分鐘演示,將帶您了解 RAG 和語義搜索的演示,使用 MongoDB Atlas、Confluence 作為集成中心、D-ID 作為與最終用戶的通信接口:
成功案例:Elemental Cognition — 由 Kafka 和 Confluence Cloud 提供支持的實時 GenAI 平臺
博士。 David Ferrucci 是著名的人工智能研究員和 IBM 突破性 Watson 技術的發明者,于 2015 年創立了 Elemental Cognition。該公司利用 GenAI 來加速和改進關鍵決策,而信任、精確度和透明度都很重要。
元素認知技術可跨行業和用例使用。主要目標是醫療保健/生命科學、投資管理、情報、物流和調度以及聯絡中心。
人工智能平臺開發負責任且透明的人工智能,幫助解決問題并提供可以理解和信任的專業知識。
Elemental Cognition 的方法將不同的人工智能策略結合在一個新穎的架構中,該架構獲取并推理人類可讀的知識,以協作和動態地解決問題。其結果是將專家解決問題的情報更加透明且更具成本效益地交付到對話和發現應用程序中。
Confluence Cloud 為 AI 平臺提供支持,以實現可擴展的實時數據和數據集成用例。我建議您查看他們的網站,從各種令人印象深刻的用例中學習。

Apache Kafka 作為 GenAI 企業架構的中樞神經系統
生成式 AI (GenAI) 需要更改 AI/ML 企業架構。增強模型、法學碩士、帶有向量數據庫的 RAG 和語義搜索需要數據集成、關聯和解耦。使用 Kafka 和 Flink 進行數據流可以為您提供幫助。
許多應用程序和數據庫使用 REST/HTTP、SQL 或其他接口進行請求-響應通信。那完全沒問題。為您的數據產品和應用程序選擇正確的技術和集成層。但要保證數據的一致性。
使用 Apache Kafka 和 Apache Flink 進行數據流處理使開發人員和數據工程師能夠專注于其數據產品或集成項目中的業務問題,因為它真正解耦了不同的領域。可以通過 HTTP、Kafka API、AsyncAPI、來自數據庫的 CDC、SaaS 接口和許多其他選項與 Kafka 集成。
Kafka 支持系統與任何通信范式的連接。其事件存儲以毫秒為單位共享數據(即使在極端規模下),但也為較慢的下游應用程序保留數據并重播歷史數據。 數據網格的核心必須實時跳動。對于任何優秀的企業架構來說都是如此。 GenAI 也不例外。
如何利用 Apache Kafka 構建對話式 AI、聊天機器人和其他 GenAI 應用程序?您是否使用 Flink 和矢量數據庫實時構建 RAG 來為 LLM 提供正確的上下文?讓我們在 LinkedIn 上聯系并進行討論!通過訂閱我的時事通訊隨時了解新的博客文章。

