1. <ul id="0c1fb"></ul>

      <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
      <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区

      RELATEED CONSULTING
      相關(guān)咨詢
      選擇下列產(chǎn)品馬上在線溝通
      服務(wù)時間:8:30-17:00
      你可能遇到了下面的問題
      關(guān)閉右側(cè)工具欄

      新聞中心

      這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
      logstash升級kafka插件

      Logstash 2.x版本kafka升級

      V1

      10年積累的成都網(wǎng)站設(shè)計、做網(wǎng)站、成都外貿(mào)網(wǎng)站建設(shè)公司經(jīng)驗,可以快速應(yīng)對客戶對網(wǎng)站的新想法和需求。提供各種問題對應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識你,你也不認(rèn)識我。但先網(wǎng)站制作后付款的網(wǎng)站建設(shè)流程,更有麥積免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。

      前言

      Logstash 2.x版本output-kafka插件只支持kafka-0.8.x版本。但是工作中我們可能用到0.9.x版本的kafka。故而需要升級Logstash-output-kafka插件至3.x版本。

      • 安裝依賴包

      yum -y install ruby rubygems ruby-devel
      gem sources --add https://ruby.taobao.org/ --remove http://rubygems.org/
      gem install jar-dependencies -v '0.3.4'
      gem install ruby-maven -v '3.3.11'
      • 升級output-kafka

      /usr/local/logstash/bin/logstash-plugin update logstash-output-kafka
      • 啟動logstash 有如下警告信息

      ./logstash -f /usr/local/logstash/conf/kafka.conf 
      Settings: Default pipeline workers: 8
      log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
      log4j:WARN Please initialize the log4j system properly.
      log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
      Pipeline main started

      解決辦法

      參考網(wǎng)站

      1.切換到/usr/local/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-3.0.1/lib/logstash/outputs/目錄下

      cd /usr/local/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-3.0.1/lib/logstash/outputs/

      2.備份kafka.rb文件

      mv kafka.rb{,.backup}

      3.新建kafka.rb文件內(nèi)容如下:

      require 'logstash/namespace'
      require 'logstash/outputs/base'
      require 'jruby-kafka'
      
      # Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on
      # the broker.
      #
      # The only required configuration is the topic name. The default codec is json,
      # so events will be persisted on the broker in json format. If you select a codec of plain,
      # Logstash will encode your messages with not only the message but also with a timestamp and
      # hostname. If you do not want anything but your message passing through, you should make the output
      # configuration something like:
      # [source,ruby]
      #     output {
      #       kafka {
      #         codec => plain {
      #            format => "%{message}"
      #         }
      #       }
      #     }
      # For more information see http://kafka.apache.org/documentation.html#theproducer
      #
      # Kafka producer configuration: http://kafka.apache.org/documentation.html#newproducerconfigs
      class LogStash::Outputs::Kafka < LogStash::Outputs::Base
        config_name 'kafka'
      
        default :codec, 'json'
      
        # The topic to produce messages to
        config :topic_id, :validate => :string, :required => true
        # This is for bootstrapping and the producer will only use it for getting metadata (topics,
        # partitions and replicas). The socket connections for sending the actual data will be
        # established based on the broker information returned in the metadata. The format is
        # `host1:port1,host2:port2`, and the list can be a subset of brokers or a VIP pointing to a
        # subset of brokers.
        config :bootstrap_servers, :validate => :string, :default => 'localhost:9092'
        # Serializer class for the key of the message
        config :key_serializer, :validate => :string, :default => 'org.apache.kafka.common.serialization.StringSerializer'
        # Serializer class for the value of the message
        config :value_serializer, :validate => :string, :default => 'org.apache.kafka.common.serialization.StringSerializer'
        # The key that will be included with the record
        #
        # If a `message_key` is present, a partition will be chosen using a hash of the key.
        # If not present, a partition for the message will be assigned in a round-robin fashion.
        config :message_key, :validate => :string
        # The number of acknowledgments the producer requires the leader to have received
        # before considering a request complete.
        #
        # acks=0,   the producer will not wait for any acknowledgment from the server at all.
        # acks=1,   This will mean the leader will write the record to its local log but
        #           will respond without awaiting full acknowledgement from all followers.
        # acks=all, This means the leader will wait for the full set of in-sync replicas to acknowledge the record.
        config :acks, :validate => ["0", "1", "all"], :default => "1"
        # The total bytes of memory the producer can use to buffer records waiting to be sent to the server.
        config :buffer_memory, :validate => :number, :default => 33554432
        # The compression type for all data generated by the producer.
        # The default is none (i.e. no compression). Valid values are none, gzip, or snappy.
        config :compression_type, :validate => ["none", "gzip", "snappy"], :default => "none"
        # Setting a value greater than zero will cause the client to
        # resend any record whose send fails with a potentially transient error.
        config :retries, :validate => :number, :default => 0
        # The producer will attempt to batch records together into fewer requests whenever multiple
        # records are being sent to the same partition. This helps performance on both the client
        # and the server. This configuration controls the default batch size in bytes.
        config :batch_size, :validate => :number, :default => 16384
        # The id string to pass to the server when making requests.
        # The purpose of this is to be able to track the source of requests beyond just
        # ip/port by allowing a logical application name to be included with the request
        config :client_id, :validate => :string
        # The producer groups together any records that arrive in between request
        # transmissions into a single batched request. Normally this occurs only under
        # load when records arrive faster than they can be sent out. However in some circumstances
        # the client may want to reduce the number of requests even under moderate load.
        # This setting accomplishes this by adding a small amount of artificial delay—that is,
        # rather than immediately sending out a record the producer will wait for up to the given delay
        # to allow other records to be sent so that the sends can be batched together.
        config :linger_ms, :validate => :number, :default => 0
        # The maximum size of a request
        config :max_request_size, :validate => :number, :default => 1048576
        # The size of the TCP receive buffer to use when reading data
        config :receive_buffer_bytes, :validate => :number, :default => 32768
        # The size of the TCP send buffer to use when sending data.
        config :send_buffer_bytes, :validate => :number, :default => 131072
        # The configuration controls the maximum amount of time the server will wait for acknowledgments
        # from followers to meet the acknowledgment requirements the producer has specified with the
        # acks configuration. If the requested number of acknowledgments are not met when the timeout
        # elapses an error will be returned. This timeout is measured on the server side and does not
        # include the network latency of the request.
        config :timeout_ms, :validate => :number, :default => 30000
        # When our memory buffer is exhausted we must either stop accepting new
        # records (block) or throw errors. By default this setting is true and we block,
        # however in some scenarios blocking is not desirable and it is better to immediately give an error.
        config :block_on_buffer_full, :validate => :boolean, :default => true
        # the timeout setting for initial metadata request to fetch topic metadata.
        config :metadata_fetch_timeout_ms, :validate => :number, :default => 60000
        # the max time in milliseconds before a metadata refresh is forced.
        config :metadata_max_age_ms, :validate => :number, :default => 300000
        # The amount of time to wait before attempting to reconnect to a given host when a connection fails.
        config :reconnect_backoff_ms, :validate => :number, :default => 10
        # The amount of time to wait before attempting to retry a failed produce request to a given topic partition.
        config :retry_backoff_ms, :validate => :number, :default => 100
      
        public
        def register
          LogStash::Logger.setup_log4j(@logger)
      
          options = {
            :key_serializer => @key_serializer,
            :value_serializer => @value_serializer,
            :bootstrap_servers => @bootstrap_servers,
            :acks => @acks,
            :buffer_memory => @buffer_memory,
            :compression_type => @compression_type,
            :retries => @retries,
            :batch_size => @batch_size,
            :client_id => @client_id,
            :linger_ms => @linger_ms,
            :max_request_size => @max_request_size,
            :receive_buffer_bytes => @receive_buffer_bytes,
            :send_buffer_bytes => @send_buffer_bytes,
            :timeout_ms => @timeout_ms,
            :block_on_buffer_full => @block_on_buffer_full,
            :metadata_fetch_timeout_ms => @metadata_fetch_timeout_ms,
            :metadata_max_age_ms => @metadata_max_age_ms,
            :reconnect_backoff_ms => @reconnect_backoff_ms,
            :retry_backoff_ms => @retry_backoff_ms
          }
          @producer = Kafka::KafkaProducer.new(options)
          @producer.connect
      
          @logger.info('Registering kafka producer', :topic_id => @topic_id, :bootstrap_servers => @bootstrap_servers)
      
          @codec.on_event do |event, data|
            begin
              key = if @message_key.nil? then nil else event.sprintf(@message_key) end
              @producer.send_msg(event.sprintf(@topic_id), nil, key, data)
            rescue LogStash::ShutdownSignal
              @logger.info('Kafka producer got shutdown signal')
            rescue => e
              @logger.warn('kafka producer threw exception, restarting',
                           :exception => e)
            end
          end
        end # def register
      
        def receive(event)
          
          if event == LogStash::SHUTDOWN
            return
          end
          @codec.encode(event)
        end
      
        def close
          @producer.close
        end
      end #class LogStash::Outputs::Kafka

      當(dāng)前標(biāo)題:logstash升級kafka插件
      分享地址:http://www.ef60e0e.cn/article/ijpdss.html
      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区
      1. <ul id="0c1fb"></ul>

        <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
        <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

        鞍山市| 荥阳市| 寿宁县| 临安市| 和平县| 肥东县| 樟树市| 丹寨县| 车险| 全南县| 昌邑市| 迁西县| 青河县| 溧阳市| 新蔡县| 沭阳县| 东乡县| 行唐县| 博兴县| 延川县| 大港区| 清丰县| 宁德市| 巴塘县| 卫辉市| 永靖县| 凤山市| 唐河县| 双江| 灵璧县| 绥阳县| 龙岩市| 肥城市| 霍邱县| 石城县| 山阴县| 绩溪县| 中阳县| 万源市| 永宁县| 通州区|