博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flume监控文件并将数据输入至Kafka
阅读量:3959 次
发布时间:2019-05-24

本文共 1975 字,大约阅读时间需要 6 分钟。

Flume配置文件

a1.sources = r1a1.sinks = k1a1.channels = c1 a1.sources.r1.type = execa1.sources.r1.bind = tail -F /usr/local/hive.log a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.topic = day02a1.sinks.k1.brokerList = hadoop:9092a1.sinks.k1.batchSize = 20a1.sinks.k1.requiredAcks = 1 a1.channels.c1.type = memory a1.sources.r1.channels = c1a1.sinks.k1.channel = c1

启动Flume

flume-ng agent --name a1 --conf $FLUME_HOME/conf --conf-file /opt/flume/flume/conf/avro-memory-kafka.conf -Dflume.root.logger=INFO,console

启动Kafka

kafka-server-start.sh /opt/kafka/kafka/config/server.properties 

创建Topic

kafka-topics.sh  --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic day02

查看Topic

kafka-topics.sh  --list --zookeeper hadoop:2181

Application

import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentimport org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe object SparkReceiver{  def main(args: Array[String]): Unit = {     val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkReceiver")    val ssc = new StreamingContext(sparkConf,Seconds(5))     //主要参数一    val kafkaParams = Map[String, Object](      "bootstrap.servers" -> "hadoop:9092",      "key.deserializer" -> classOf[StringDeserializer],      "value.deserializer" -> classOf[StringDeserializer],      "group.id" -> "test",      "auto.offset.reset" -> "latest",      "enable.auto.commit" -> (false: java.lang.Boolean)    )     //主要参数二    val topics = Array("day02")    val stream = KafkaUtils.createDirectStream[String,String](      ssc,      PreferConsistent,      Subscribe[String, String](topics, kafkaParams)    )     stream.map(record => (record.key, record.value)).print()     ssc.start()    ssc.awaitTermination()  }}

 

转载地址:http://uaazi.baihongyu.com/

你可能感兴趣的文章
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>
Android 下 JNI 开发
查看>>