本文共 1975 字,大约阅读时间需要 6 分钟。
Flume配置文件
a1.sources = r1a1.sinks = k1a1.channels = c1 a1.sources.r1.type = execa1.sources.r1.bind = tail -F /usr/local/hive.log a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.topic = day02a1.sinks.k1.brokerList = hadoop:9092a1.sinks.k1.batchSize = 20a1.sinks.k1.requiredAcks = 1 a1.channels.c1.type = memory a1.sources.r1.channels = c1a1.sinks.k1.channel = c1
启动Flume
flume-ng agent --name a1 --conf $FLUME_HOME/conf --conf-file /opt/flume/flume/conf/avro-memory-kafka.conf -Dflume.root.logger=INFO,console
启动Kafka
kafka-server-start.sh /opt/kafka/kafka/config/server.properties
创建Topic
kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic day02
查看Topic
kafka-topics.sh --list --zookeeper hadoop:2181
Application
import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentimport org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe object SparkReceiver{ def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkReceiver") val ssc = new StreamingContext(sparkConf,Seconds(5)) //主要参数一 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hadoop:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "test", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) //主要参数二 val topics = Array("day02") val stream = KafkaUtils.createDirectStream[String,String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) stream.map(record => (record.key, record.value)).print() ssc.start() ssc.awaitTermination() }}
转载地址:http://uaazi.baihongyu.com/