val sparkConf = newSparkConf().setMaster("local[*]").setAppName("wordcount") val ssc = newStreamingContext(sparkConf, Seconds(5)) val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
// TODO 可以将DStream转换为RDD进行操作。 // DStream => old RDD => new RDD => new DStream val resultDS: DStream[(String, Int)] = socketDS.transform( rdd => { val flatRDD = rdd.flatMap(_.split(" ")) val mapRDD = flatRDD.map((_, 1)) val reduceRDD = mapRDD.reduceByKey(_ + _) reduceRDD } ) resultDS.print()
val sparkConf = newSparkConf().setMaster("local[*]").setAppName("wordcount") val ssc = newStreamingContext(sparkConf, Seconds(5)) val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
// TODO transform可以获取底层的RDD进行处理 // TODO transform可以周期性的执行driver的代码逻辑
val sparkConf = newSparkConf().setMaster("local[*]").setAppName("wordcount") val ssc = newStreamingContext(sparkConf, Seconds(5)) ssc.checkpoint("scp") val socketDS = ssc.socketTextStream("localhost", 9999)
val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))
val wordToOneDS: DStream[(String, Int)] = wordDS.map((_,1)) // TODO 使用有状态操作updateStateByKey保存数据 // SparkStreaming的状态保存依赖的是checkpoint,所以需要设定相关路径 val wordToCountDS: DStream[(String, Long)] = wordToOneDS.updateStateByKey[Long]( // 累加器 = 6 // UDAF = 8 // TODO 第一个参数表示相同key的value数据集合 // TODO 第二个参数表示相同key的缓冲区的数据 (seq: Seq[Int], buffer: Option[Long]) => { // TODO 返回值表示更新后的缓冲区的值 val newBufferValue = buffer.getOrElse(0L) + seq.sum Option(newBufferValue) } ) wordToCountDS.print()
defmain(args: Array[String]): Unit = { val sparkConf = newSparkConf().setMaster("local[*]").setAppName("wordcount") val ssc = newStreamingContext(sparkConf, Seconds(3))
// 滑窗 val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
defmain(args: Array[String]): Unit = { val sparkConf = newSparkConf().setMaster("local[*]").setAppName("wordcount") val ssc = newStreamingContext(sparkConf, Seconds(3))
// 滑窗 val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val wordToOneDS: DStream[(String, Int)] = socketDS .flatMap(_.split(" ")) .map((_, 1)) val windowDS: DStream[(String, Int)] = wordToOneDS.reduceByKeyAndWindow( (x: Int, y: Int) => x + y, Seconds(6), Seconds(3) ) windowDS.print()
defmain(args: Array[String]): Unit = { val sparkConf = newSparkConf().setMaster("local[*]").setAppName("wordcount") val ssc = newStreamingContext(sparkConf, Seconds(3)) ssc.checkpoint("scp") // 滑窗 val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val wordToOneDS: DStream[(String, Int)] = socketDS.map(num=>("a", 1)) val windowDS: DStream[(String, Int)] = wordToOneDS.reduceByKeyAndWindow( (x: Int, y: Int) => { val sum = x + y println( sum + "=" + x + "+" + y ) sum }, (x:Int, y:Int) => { val diff = x - y println( diff + "=" + x + "-" + y ) diff }, Seconds(6), Seconds(3) ) windowDS.print()
defmain(args: Array[String]): Unit = { val sparkConf = newSparkConf().setMaster("local[*]").setAppName("wordcount") val ssc = newStreamingContext(sparkConf, Seconds(3)) ssc.checkpoint("scp")
// 滑窗 val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
// 对窗口的数据进行计数,会使用checkpoint进行保存 val countDS: DStream[Long] = socketDS.countByWindow(Seconds(6), Seconds(3))
defmain(args: Array[String]): Unit = { val sparkConf = newSparkConf().setMaster("local[*]").setAppName("wordcount") val ssc = newStreamingContext(sparkConf, Seconds(5))
val socketDS = ssc.socketTextStream("localhost", 9999)
// 将数据保存到MySQL数据库中 // id, name, age socketDS.foreachRDD(rdd=>{ rdd.foreach(data=>{ // 解决性能问题 val datas = data.split(",") val id = datas(0).toInt val name = datas(1) val age = datas(2).toInt
// TODO 加载数据库驱动 Class.forName("com.mysql.jdbc.Driver") // TODO 建立链接和操作对象 val conn = DriverManager.getConnection( "jdbc:mysql://linux1:3306/rdd", "root","000000") val sql = "insert into user (id ,name, age) values (?, ?, ?)" val statement: PreparedStatement = conn.prepareStatement(sql) statement.setInt(1, id) statement.setString(2, name) statement.setInt(3, age) // TODO 操作数据 statement.executeUpdate() // TODO 关闭连接 statement.close() conn.close() println("数据保存成功!!!") }) })
defmain(args: Array[String]): Unit = { val sparkConf = newSparkConf().setMaster("local[*]").setAppName("wordcount") val ssc = newStreamingContext(sparkConf, Seconds(5))
val socketDS = ssc.socketTextStream("localhost", 9999)
// 将数据保存到MySQL数据库中 // id, name, age
// TODO 加载数据库驱动 Class.forName("com.mysql.jdbc.Driver") // TODO 建立链接和操作对象 // TODO 所有的连接对象都不支持序列化操作 val conn = DriverManager.getConnection( "jdbc:mysql://linux1:3306/rdd", "root","000000") val sql = "insert into user (id ,name, age) values (?, ?, ?)" val statement: PreparedStatement = conn.prepareStatement(sql)
socketDS.foreachRDD(rdd=>{ // TODO RDD的方法称之为算子,存在分布式计算,需要进行闭包检测 rdd.foreach(data=>{ // 解决性能问题 val datas = data.split(",") val id = datas(0).toInt val name = datas(1) val age = datas(2).toInt
defmain(args: Array[String]): Unit = { val sparkConf = newSparkConf().setMaster("local[*]").setAppName("wordcount") val ssc = newStreamingContext(sparkConf, Seconds(5))
val socketDS = ssc.socketTextStream("localhost", 9999)
// 将数据保存到MySQL数据库中 // id, name, age
socketDS.foreachRDD(rdd=>{ //【注意】mapPartitions和foreachPartition的区别: // 以分区为单位进行转换 => 返回 //rdd.mapPartitions() // 以分区为单位进行遍历 => 不需要返回 rdd.foreachPartition( datas => { // TODO 加载数据库驱动 Class.forName("com.mysql.jdbc.Driver") // TODO 建立链接和操作对象 // TODO 所有的连接对象都不支持序列化操作 val conn = DriverManager.getConnection( "jdbc:mysql://linux1:3306/rdd", "root","000000") val sql = "insert into user (id ,name, age) values (?, ?, ?)" val statement: PreparedStatement = conn.prepareStatement(sql)
// datas 其实是scala的集合,所以不存在分布式计算的概念 datas.foreach( data => { // 解决性能问题 val datas = data.split(",") val id = datas(0).toInt val name = datas(1) val age = datas(2).toInt
val sparkConf = newSparkConf().setMaster("local[*]").setAppName("wordcount") // TODO 配置优雅地关闭 sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = newStreamingContext(sparkConf, Seconds(5))
val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val wordDS: DStream[String] = socketDS.flatMap(_.split(" ")) val wordToOneDS: DStream[(String, Int)] = wordDS.map( (_, 1) ) val wordToCountDS: DStream[(String, Int)] = wordToOneDS.reduceByKey(_+_)
wordToCountDS.print()
ssc.start()
newThread( newRunnable { overridedefrun(): Unit = { // TODO SparkStreaming是可以停止。但是停止的逻辑代码的位置? // TODO stop方法不能放置在driver的主线程中。 // TODO 直接调用ssc的stop方法是不可以的。需要循环判断sparkStreaming是否应该关闭 while ( true ) { // TODO 在Driver端应该设置标记,让当前关闭线程可以访问。可以动态改变状态。 // TODO 但是Driver端的标记何时更新,由谁更新都是不确定的。 // TODO 所以一般标记不是放置在Driver端,而是在第三方软件中:redis,zk,mysql,hdfs
Class.forName("com.mysql.jdbc.Driver") // TODO 建立链接和操作对象 // TODO 所有的连接对象都不支持序列化操作 val conn = DriverManager.getConnection( "jdbc:mysql://linux1:3306/rdd", "root","000000") val sql = "select age from user where id = 1" val statement: PreparedStatement = conn.prepareStatement(sql) val rs: ResultSet = statement.executeQuery() rs.next() val age: Int = rs.getInt(1) if ( age <= 20 ) {
// TODO 判断SSC的状态 val state: StreamingContextState = ssc.getState() if ( state == StreamingContextState.ACTIVE ) { println("SparkStreaming的环境准备关闭...") // TODO 优雅地关闭SSC // 将现有的数据处理完再关闭就是优雅地关闭 ssc.stop(true, true) System.exit(0) } }
Thread.sleep(1000 * 5) }
} } ).start()
ssc.awaitTermination() // TODO Thread 线程停止的方式?run方法执行完毕 // 为什么不调用stop方法停止线程?因为会出现数据安全问题 // i++ => 1), 2) // new Thread().stop()