Presto on Apache Kafka 在 Uber 的大規模應用
InfoQ · 科技 ·

Presto on Apache Kafka 在 Uber 的大規模應用

。在這篇文章中,我們將探討如何將這兩個重要的服務結合起來,即在Uber

本文最初發布於 Uber 官方博客,InfoQ 經授權翻譯如下


Uber 的目的就是要讓全世界變得更好,而大數據是一個非常重要的部分。Presto 和 Apache Kafka 在 Uber 的大數據棧中扮演了重要角色。Presto 是查詢聯盟的事實標準,它已經在交互查詢、近實時數據分析以及大規模數據分析中得到應用。Kafka 是一個支持很多用例的數據流中樞,比如 pub/sub、流處理等。在這篇文章中,我們將探討如何將這兩個重要的服務結合起來,即在 Uber 的 Kafka 上,通過 Presto 實現輕量級的交互式 SQL 查詢。


afka是一個支持很多用例的數據流中樞,比如pub/sub、流處理等

圖 1:Uber 的大數據棧


Uber 的 Presto 項目


Uber 通過開源的 Presto,可以對任何數據源進行查詢,不管是動態數據還是靜態數據。Presto 的多功能性讓我們可以做出智能的、數據驅動的業務決策。我們運營著大約 15 個 Presto 集群,跨越 5000 多個節點。我們擁有 7000 個每周活躍的用戶,他們每天都會進行 50 萬次的查詢,從 HDFS 中讀取 50PB 左右的數據。現在,Presto 可以通過可擴展的數據源連接器,查詢多種數據源,比如 Apache Hive、Apache Pinot、AresDb、MySQL、Elasticsearch 和 Apache Kafka。你還可以在我們之前的一些博文中找到更多有關 Presto 的信息:


  • 《在 Uber 使用 Presto 和 Apache Parquet 進行工程數據分析》(Engineering Data Analytics with Presto and Apache Parquet at Uber)
  • 《構建一個更好的大數據架構:認識 Uber 的 Presto 團隊》(Building a Better Big Data Architecture: Meet Uber’s Presto Team)

Uber 的 Apache Kafka 項目


Uber 是 Apache Kafka 部署規模最大的公司之一,每天處理數萬億條消息和多達 PB 的數據。從圖 2 可以看出,Apache Kafka 是我們技術棧的基礎,支持大量不同的工作流,其中包括一個 pub-sub 消息總線,用於從 Rider 和 Driver 應用中傳送事件數據,諸如 Apache Flink® 的流分析,把資料庫變更記錄傳送到下游用戶,並且把各種各樣的數據攝入到 Uber 的 Apache Hadoop® 數據湖中。我們已經完成了許多有趣的工作,以保證其性能、可靠性和用戶友好,比如:


  • 《Uber 的多區域 Kafka 的災難恢復》(Disaster Recovery for Multi-Region Kafka at Uber)
  • 《使用消費者代理實現無縫 Kafka 異步排隊》(Enabling Seamless Kafka Async Queuing with Consumer Proxy)
  • 《使用 Apache Flink、Kafka 和 Pinot 進行實時精確的廣告事件處理》(Real-Time Exactly-Once Ad Event Processing with Apache Flink, Kafka, and Pinot)


實標準,它已經在交互查詢、近實時數據分析以及大規模數據分析中得到應用。K

圖 2:Uber 的 Kafka 項目


問題陳述


多年來,Uber 的數據團隊對於 Kafka 流的分析需求不斷增加,這是由於實時數據的及時、臨時的數據分析為數據科學家、工程師和運營團隊提供了寶貴的信息。


下面是 Kafka 團隊的一個典型請求示例:運營團隊正在調查為何一些消息沒有被一個關鍵的服務所處理,從而會對最終用戶造成直接影響。運營團隊隨後收集了一些 UUID,這些 UUID 報告了問題,並要求檢查它們是否存在於服務的輸入 / 輸出 Kafka 流中。如圖 3 所示,該請求可以被表述為查詢:「Kafka 主題 T 中是否缺少 UUID 為 X 的順序?」


在Uber的大數據棧中扮演了重要角色。Presto是查詢聯盟的事

圖 3:假定用例:檢查 Kafka 主題中是否缺少 UUID X 的順序


考慮的替代方案


這樣的問題一般都是由大數據進行實時分析來解決。在這個領域的各種技術中,我們專注於兩類開源解決方案,即:流處理和實時 OLAP 數據存儲。


流處理引擎,例如 Apache Flink、Apache Storm™ 或 ksql 可以持續地處理流,並且輸出經過處理的流或者增量的維護可更新的視圖。由於用戶想要在以前的事件中執行點查詢或者執行分析查詢,所以這種流處理並不適用於以上問題。


另一方面,實時 OLAP 數據存儲,如 Apache Pinot、Apache Druid 和 Clickhouse,則更適合。這些 OLAP 存儲配備了高級的索引技術,所以可以為 Kafka 數據流建立索引,從而實現低延遲的查詢。實際上,Uber 早在數年之前就已經開始使用 Apache Pinot,而現在,Pinot 已經成為 Uber 數據平台中的一個重要技術,它可以為多個關鍵任務進行實時分析應用。你可以看看我們以前發表的博文,討論 Uber 如何使用 Pinot。


但是,實時 OLAP 需要一個非同尋常的加載過程,以創建一個從 Kafka 流中攝入的表,並對該表進行優化以達到最好的性能。另外,OLAP 存儲還需要存儲和計算資源來提供服務,因此這種解決方案被推薦給那些反覆查詢表並要求較低延遲的用例(如面向用戶的應用),但不包括臨時性的故障排除或探索。


所以,這個問題促使 Kafka 和 Presto 團隊共同尋找一種基於下列因素的輕量級解決方案:


  1. 它重用了現有的 Presto 部署,這是一項成熟的技術,在 Uber 已有多年實戰檢驗。
  2. 它不需要任何加載:Kafka 主題可以被發現,並且在創建後可以立即被查詢。
  3. Presto 以其強大的跨數據源的查詢聯合能力而聞名,因此可以讓 Kafka 和其他數據源(如 Hive/MySQL/Redis)進行關聯,從而獲得跨數據平台的洞察力。


然而,這種 Presto 方法也存在其局限性。例如,由於 Kafka 連接器沒有建立索引,所以它的性能比實時 OLAP 存儲要差。另外,對於 Uber 的可擴展性需求,在連接器上還有其他挑戰需要解決,我們將在下一節詳細說明。


Uber 面臨的挑戰


Presto 已經有一個 Kafka 連接器,支持通過 Presto 查詢 Kafka。然而,這個解決方案並不完全適合我們在 Uber 的大規模 Kafka 架構。其中存在一些挑戰:


  • Kafka Topic 和 Cluster Discovery:在 Uber,我們將 Kafka 作為一種服務來提供,用戶可以隨時通過自助服務門戶向 Kafka 搭載新的主題。因此,我們必須要有一個動態的 Kafka 主題發現。但是,當前 Presto Kafka 連接器中的 Kafka 主題和集群發現是靜態的,因此需要我們在每次搭載新主題時都要重啟連接器。
  • 數據模式發現:與 Kafka 主題和集群發現類似,我們將模式註冊作為一項服務提供,並支持用戶自助加載。因此,我們需要 Presto-Kafka 連接器能夠按需檢索最新的模式。
  • 查詢限制:對於我們來說,限制每一個查詢能夠從 Kafka 中消耗的數據數量非常重要。Uber 擁有很多大型的 Kafka 主題,其字節率可以達到 500M/s。我們知道,與其他替代方案相比,Presto-Kafka 查詢速度相對緩慢,而要從 Kafka 中提取大量數據的查詢,則要花費相當長的時間。這對於用戶的體驗和 Kafka 集群的健康都是不利的。
  • 配額控制:作為一個分布式的查詢引擎,Presto 可以以非常高的吞吐量同時消耗 Kafka 的消息,這可能會導致 Kafka 集群的潛在集群退化。限制 Presto 的最大消費吞吐量對於 Kafka 集群的穩定性至關重要。


架構


Uber 的數據生態系統為用戶提供了一種方法,可以編寫一個 SQL 查詢,並將其提交給 Presto 集群執行。每個 Presto 集群都有一個協調器節點,負責解析 SQL 語句,規劃查詢,並為人工節點執行的任務進行調度。Presto 內部的 Kafka 連接器允許將 Kafka 主題作為表格使用,主題中的每條消息在 Presto 中被表示為一行。在收到查詢時,協調器會確定查詢是否有適當的過濾器。一旦驗證完成,Kafka 連接器從 Kafka 集群管理服務中獲取集群和主題信息,從模式服務中獲取模式。然後, Presto 工作器與 Kafka 集群並行對話,獲取所需的 Kafka 消息。我們還為 Presto 用戶在 Kafka 集群上設置了一個代理配額,這可以防止集群的降級。


而大數據是一個非常重要的部分。Presto和ApacheKafka

圖 4:高級架構


詳細改進


下面幾節將深入探討我們為克服現有 Presto Kafka 連接器的局限性所做的改進,使其能夠用於大規模用例。


Kafka 集群 / 主題和數據模式發現


我們做了一些改變以實現按需的集群 / 主題和模式發現。首先,Kafka 主題元數據和數據模式是在運行時通過 KafkaMetadata 獲取的,我們提取了 TableDescriptionSupplier 接口來提供這些元數據,然後我們擴展了該接口並實現了一個新的策略,在運行時從內部 Kafka 集群管理服務和模式註冊中心讀取 Kafka 主題元數據。同樣地,我們重構了 KafkaClusterMetadataSupplier,並實現了一個新的策略,在運行時讀取集群元數據。由於集群元數據是按需獲取的,我們也能夠在一個 Kafka 連接器中支持多個 Kafka 集群。為所有這些元數據增加一個緩存層,以減少對 Kafka 集群管理模式服務的請求數量。


權翻譯如下Uber的目的就是要讓全世界變得更好,

圖 5:Kafka 集群 / 主題和數據模式發現


查詢過濾器


為了提高 Kafka 和 Presto 集群的可靠性,我們希望避免大型查詢讀取過多的數據。為了實現這一點,我們增加了列過濾器的執行,檢查 Kafka 的 Presto 查詢的過濾器約束中是否存在 _timestamp_partition_offset。沒有這些過濾器的查詢將被拒絕。


Kafka 集群的配額控制


Kafka 是 Uber 的一個重要的基礎設施,有很多實時用例,Kafka 集群的退化可能會產生巨大的影響,所以我們要不惜一切代價避免它。作為一個分布式的查詢引擎,Presto 可能會啟動數百個消費者線程,從 Kafka 並發地獲取消息。這種消費模式可能會耗盡網絡資源,並導致潛在的 Kafka 集群退化,這是我們想要防止的。


我們可以做的一件事是,在 Presto 集群層面上限制消費率,但從 Presto 方面來說,這不是很容易實現。作為一種選擇,我們決定利用 Kafka 的代理配額來實現我們的目標。我們做了一個改變,允許我們從連接器配置中指定一個 Kafka 消費者客戶端 ID。有了這個改變,我們就能為 Presto 中的所有工作者使用一個靜態的 Kafka 客戶端 ID,而且他們將受制於同一個配額池。


當然,這種方法是有代價的。多個 presto 查詢同時進行,將需要更長的時間來完成。這是我們不得不作出的犧牲。在現實中,由於我們擁有查詢過濾器,所以大部分的查詢都可以在一定的時間裡完成。


結論


在推出該特性後,我們看到在做臨時探索時,生產力有了很大的提高。在這之前,工程師們需要花費數十分鐘甚至更長的時間來查詢我們上面提到的例子的數據,但現在我們可以寫一個簡單的 SQL 查詢 SELECT * FROM kafka.cluster.order WHERE uuid= 『0e43a4-5213-11ec』,結果可以在幾秒鐘內返回。


本文最初發布於Uber官方博客,InfoQ經授

圖 6:假設的用例。檢查 Kafka 主題中是否缺少 UUID X 的順序


截至寫這篇博文時,越來越多的用戶開始採用 Presto on Kafka 進行臨時探索。每天有 6000 個查詢,我們也從 Presto 用戶那裡得到了很好的反饋,他們說 Presto on Kafka 讓他們的數據分析變得更加容易。


在未來,我們計劃將我們所做的改進貢獻給開源社區。你也可以查看我們的 PrestoCon 演講,了解更多關於我們所做工作的細節。


作者介紹:


Yang Yang,Uber 流數據團隊軟體工程師,致力於在 Uber 構建高度可擴展、可靠的 Kafka 生態系統,包括 uReplicator、Kafka Consumer Proxy 等內部工具。


Yupeng Fu,Uber 數據團隊首席軟體工程師。領導幾個流團隊構建可擴展、可靠和性能良好的流解決方案。他還是 Apache Pinot 的提交者。


Hitarth Trivedi,Uber 數據分析團隊的高級軟體工程師。他主要負責 Presto 的工作。


原文連結:


https://eng.uber.com/presto-on-apache-kafka-at-uber-scale/

聲明:文章觀點僅代表作者本人,PTTZH僅提供信息發布平台存儲空間服務。
喔!快樂的時光竟然這麼快就過⋯
繼續其他精彩內容吧!
more