1. <ul id="0c1fb"></ul>

      <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
      <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区

      RELATEED CONSULTING
      相關咨詢
      選擇下列產(chǎn)品馬上在線溝通
      服務時間:8:30-17:00
      你可能遇到了下面的問題
      關閉右側工具欄

      新聞中心

      這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
      如何使用Apache查詢Pulsar流

      本篇文章給大家分享的是有關如何使用 Apache查詢Pulsar流,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

      創(chuàng)新互聯(lián)公司是創(chuàng)新、創(chuàng)意、研發(fā)型一體的綜合型網(wǎng)站建設公司,自成立以來公司不斷探索創(chuàng)新,始終堅持為客戶提供滿意周到的服務,在本地打下了良好的口碑,在過去的十載時間我們累計服務了上千家以及全國政企客戶,如橡塑保溫等企業(yè)單位,完善的項目管理流程,嚴格把控項目進度與質量監(jiān)控加上過硬的技術實力獲得客戶的一致贊揚。

      這里將介紹 Apache Pulsar 和 Apache Flink 的集成和最新研發(fā)進展,并詳細說明如何利用 Pulsar 內置 schema,使用 Apache Flink 實時查詢 Pulsar 流。


      Apache Pulsar 簡介

      Apache Pulsar 是一個靈活的發(fā)布/訂閱消息系統(tǒng),支持持久日志存儲。  Pulsar 的架構優(yōu)勢包括多租戶、統(tǒng)一消息模型、結構化事件流、云原生架構等,這些優(yōu)勢讓 Pulsar 能夠完美適用于多種用戶場景,從計費、支付、交易服務到融合組織中不同的消息架構。  

      現(xiàn)有 Pulsar & Flink 集成

      (Apache Flink 1.6+)

      在現(xiàn)有的 Pulsar 和 Flink 集成中,Pulsar 作為 Flink 應用程序中的消息隊列來使用。Flink 開發(fā)人員可以選擇特定 Pulsar source,并連接到所需的 Puslar 集群和 topic,將 Pulsar 用作 Flink 的流 source 和流 sink:

      // create and configure Pulsar consumerPulsarSourceBuilderbuilder = PulsarSourceBuilder    .builder(new SimpleStringSchema())   .serviceUrl(serviceUrl)  .topic(inputTopic)  .subsciptionName(subscription);SourceFunction src = builder.build();// ingest DataStream with Pulsar consumerDataStream words = env.addSource(src);

      然后,Pulsar 流可以連接到 Flink 的處理邏輯。

      // perform computation on DataStream (here a simple WordCount)DataStream wc = words  .flatmap((FlatMapFunction) (word, collector) -> {    collector.collect(new WordWithCount(word, 1));  })
       .returns(WordWithCount.class)  .keyBy("word")  .timeWindow(Time.seconds(5))  .reduce((ReduceFunction) (c1, c2) ->    new WordWithCount(c1.word, c1.count + c2.count));

      然后通過 sink 將數(shù)據(jù)寫出到 Pulsar。

      // emit result via Pulsar producer wc.addSink(new FlinkPulsarProducer<>(  serviceUrl,  outputTopic,  new AuthentificationDisabled(),  wordWithCount -> wordWithCount.toString().getBytes(UTF_8),  wordWithCount -> wordWithCount.word));

      對于集成而言,這是重要的第一步,但現(xiàn)有設計還不足以充分利用 Pulsar 的全部功能。

      Pulsar 與 Flink 1.6.0 的集成中有一些不足,包括:既沒有作為持久存儲來使用,也沒有與 Flink 進行 schema 集成,導致在為應用程序 schema 注冊添加描述時,需要手動輸入。


      Pulsar 與 Flink 1.9 的集成

      將 Pulsar 用作 Flink catalog

      Flink 1.9.0 與 Pulsar 的最新集成解決了前面提到的問題。阿里巴巴 Blink 對 Flink 倉庫的貢獻不僅強化了處理架構,還增加了新功能,使得 Flink 與 Pulsar 的集成更強大有效。

      Flink 1.9.0:
      https://flink.apache.org/downloads.html#apache-flink-191


      在新 connector 的實現(xiàn)中引入了 Pulsar schema 集成,增加了對 Table API 的支持,同時提供了 exactly-once 語義的 Pulsar 讀與 at-least-once 語義的 Pulsar 寫。

      并且,通過 schema 集成,Pulsar 可以注冊為 Flink catalog,只需幾個命令就可以在 Pulsar 流上運行 Flink 查詢。下面我們將詳細介紹新的集成,并舉例說明如何使用 Flink SQL 查詢 Pulsar 流。

      利用 Flink <> Pulsar Schema 集成

      在展開集成細節(jié)與具體的使用方法之前,我們先來看一下 Pulsar schema 是怎么工作的。

      Apache Pulsar 內置對 Schema 的支持,無須額外管理 schema。Pulsar 的數(shù)據(jù) schema 與每個 topic 相關聯(lián),因此,producer 和 consumer 都可以使用預定義 schema 信息發(fā)送數(shù)據(jù),而 broker 可以驗證 schema ,并在兼容性檢查中管理 schema 多版本化和 schema 演化。

      下面分別是 Pulsar schema 用于 producer 和 consumer 的示例。在 producer 端,可以指定使用 schema,并且 Pulsar 無需執(zhí)行序列化/反序列化,就可以發(fā)送一個 POJO 類。

      類似地,在 consumer 端,也可以指定數(shù)據(jù) schema,并且在接收到數(shù)據(jù)后,Pulsar 會立即自動驗證 schema 信息,獲取給定版本的 schema,然后將數(shù)據(jù)反序列化到 POJO 結構。Pulsar 在 topic 的元數(shù)據(jù)中存儲 schema 信息。

      // Create producer with Struct schema and send messagesProducer producer = client.newProducer(Schema.AVRO(User.class)).create();producer.newMessage()  .value(User.builder()    .userName(“pulsar-user”)    .userId(1L)    .build())  .send();// Create consumer with Struct schema and receive messagesConsumer consumer = client.newCOnsumer(Schema.AVRO(User.class)).create();consumer.receive();

      假設一個應用程序對 producer 和/或 consumer 指定 schema。在接收到 schema 信息時,連接到 broker 的 producer(或 consumer)傳輸此類信息,以便 broker 在返回或拒絕該 schema 前注冊 schema、驗證 schema,并檢查 schema 兼容性,如下圖所示:

      如何使用 Apache查詢Pulsar流

      Pulsar 不僅可以處理并存儲 schema 信息,還可以在必要時處理 schema 演化(schema evolution)。Pulsar 能夠有效管理 broker 中的 schema 演化,在必要的兼容性檢查中,追蹤 schema 的所有版本。

      另外,當消息發(fā)布在 producer 端時,Pulsar 會在消息的元數(shù)據(jù)中標記 schema 版本;當 consumer 接收到消息,并完成反序列化元數(shù)據(jù)時,Pulsar 將會檢查與此消息相關聯(lián)的 schema 版本,并從 broker 中獲取 schema 信息。

      因此,當 Pulsar 與 Flink 應用集成時,Pulsar 使用預先存在的 schema 信息,并將帶有 schema 信息的單個消息映射到 Flink 類型系統(tǒng)的不同行中。

      當 Flink 用戶不直接與 schema 交互或不使用原始 schema(primitive schema)時(例如,用 topic 來存儲字符串或長數(shù)值),Pulsar 會轉換消息到 Flink 行,即“值”;或者在結構化的 schema 類型(例如,JSON 和 AVRO)中,Pulsar 從 schema 信息中提取單個字段信息,并將字段映射到 Flink 的類型系統(tǒng)。

      最后,所有與消息相關的元數(shù)據(jù)信息(例如,消息密鑰、topic、發(fā)布時間、事件時間等)都會轉換到 Flink 行中的元數(shù)據(jù)字段。以下是使用原始 schema 和結構化 schema 的兩個示例,解釋了如何將數(shù)據(jù)從 Pulsar topic 轉換到 Flink 類型系統(tǒng)。

      原始 schema(Primitive Schema):

      root|-- value: DOUBLE|-- __key: BYTES|-- __topic: STRING|-- __messageId: BYTES|-- __publishTime: TIMESTAMP(3)|-- __eventTime: TIMESTAMP(3)

      結構化 schema(Avor Schema):

      @Data@AllArgsConstructor@NoArgsConstructorpublic static class Foo {    public int i;    public float f;    public Bar bar;}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Bar {    public boolean b;    public String s;}Schema s = Schema.AVRO(Foo.getClass());
      root |-- i: INT |-- f: FLOAT |-- bar: ROW<`b` BOOLEAN, `s` STRING> |-- __key: BYTES |-- __topic: STRING |-- __messageId: BYTES |-- __publishTime: TIMESTAMP(3) |-- __eventTime: TIMESTAMP(3)

      當所有 schema 信息都映射到 Flink 類型系統(tǒng)時,就可以在 Flink 中根據(jù)指定 schema 信息構建 Pulsar source、sink 或 catalog,如下所示:

      Flink & Pulsar: 從 Pulsar 讀取數(shù)據(jù)

      1. 創(chuàng)建用于流查詢的 Pulsar source

      val env = StreamExecutionEnvironment.getExecutionEnvironmentval props = new Properties()props.setProperty("service.url", "pulsar://...")props.setProperty("admin.url", "http://...")props.setProperty("partitionDiscoveryIntervalMillis", "5000")props.setProperty("startingOffsets", "earliest")props.setProperty("topic", "test-source-topic")val source = new FlinkPulsarSource(props)// you don't need to provide a type information to addSource since FlinkPulsarSource is ResultTypeQueryableval dataStream = env.addSource(source)(null)
      // chain operations on dataStream of Row and sink the output// end method chaining
      env.execute()

      2. 將 Pusar 中的 topic 注冊為 streaming tables

      val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = StreamTableEnvironment.create(env)
      val prop = new Properties()prop.setProperty("service.url", serviceUrl)prop.setProperty("admin.url", adminUrl)prop.setProperty("flushOnCheckpoint", "true")prop.setProperty("failOnWrite", "true")props.setProperty("topic", "test-sink-topic")
      tEnv  .connect(new Pulsar().properties(props))  .inAppendMode()  .registerTableSource("sink-table")
      val sql = "INSERT INTO sink-table ....."tEnv.sqlUpdate(sql)env.execute()

      Flink & Pulsar:向 Pulsar 寫入數(shù)據(jù)

      1. 創(chuàng)建用于流查詢的 Pulsar sink

      val env = StreamExecutionEnvironment.getExecutionEnvironmentval stream = .....
      val prop = new Properties()prop.setProperty("service.url", serviceUrl)prop.setProperty("admin.url", adminUrl)prop.setProperty("flushOnCheckpoint", "true")prop.setProperty("failOnWrite", "true")props.setProperty("topic", "test-sink-topic")
      stream.addSink(new FlinkPulsarSink(prop, DummyTopicKeyExtractor))env.execute()

      2. 向 Pulsar 寫入 streaming table

      val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = StreamTableEnvironment.create(env)
      val prop = new Properties()prop.setProperty("service.url", serviceUrl)prop.setProperty("admin.url", adminUrl)prop.setProperty("flushOnCheckpoint", "true")prop.setProperty("failOnWrite", "true")props.setProperty("topic", "test-sink-topic")
      tEnv  .connect(new Pulsar().properties(props))  .inAppendMode()  .registerTableSource("sink-table")
      val sql = "INSERT INTO sink-table ....."tEnv.sqlUpdate(sql)env.execute()

      在以上示例中,F(xiàn)link 開發(fā)人員都無需擔心 schema 注冊、序列化/反序列化,并將 Pulsar 集群注冊為 Flink 中的 source、sink 或 streaming table。

      當這三個要素同時存在時,Pulsar 會被注冊為 Flink 中的 catalog,這可以極大簡化數(shù)據(jù)處理與查詢,例如,編寫程序從 Pulsar 查詢數(shù)據(jù),使用 Table API 和 SQL 查詢 Pulsar 數(shù)據(jù)流等。

      以上就是如何使用 Apache查詢Pulsar流,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


      網(wǎng)站題目:如何使用Apache查詢Pulsar流
      URL鏈接:http://www.ef60e0e.cn/article/ggojhg.html
      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区
      1. <ul id="0c1fb"></ul>

        <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
        <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

        两当县| 长阳| 馆陶县| 沭阳县| 惠东县| 河曲县| 大庆市| 龙游县| 汽车| 亚东县| 英吉沙县| 塔城市| 阳新县| 乡宁县| 冕宁县| 海盐县| 福海县| 宁海县| 客服| 澄江县| 福泉市| 逊克县| 正宁县| 沙洋县| 潞西市| 河曲县| 白玉县| 卫辉市| 池州市| 黔西县| 沧州市| 香河县| 青浦区| 惠州市| 平果县| 财经| 四平市| 扎赉特旗| 北碚区| 滦南县| 北碚区|