阿里云安全(en)

使用阿里云InfluxDB®和Spark Streaming实时处理时序数据(en)

2019-11-27 00:00:00 mimukeji

本文重点介绍怎样利用阿里云InfluxDB®和spark structured streaming来实时计算、存储和可视化数据。下面将介绍如何购买和初始化阿里云InfluxDB®,扩展spark foreach writer,以及设计阿里云InfluxDB®数据库时需要注意的事项。
image
在大数据处理中,一个主要的趋势是人们希望看到metric是如何随着时间变化发展。这使得管理和处理时序数据(数值随时间变化的数据)成为数据科学家非常重要的研究方向。目前,已经有非常多的时序处理数据库产品,如OpenTSDB,TimeScaleDB,InfluxDB以及Druid等。InfluxDB因为完整的生态、类SQL的查询语言以及简单快捷的布署而非常受用户喜爱,居于DBEngine时序数据排列首位。阿里云已经将其进行开源托管,并且完善了TIG(Telegraf/InfluxDB/Grafana)生态,即将推出托管的Kapacitor流处理报警组件。

阿里云InfluxDB®

关于时序数据的一些重要概念和如何购买阿里云InfluxDB®可以参考之前的文章<阿里云InfluxDB®教你玩转A股数据>和官方文档。这里补充一下阿里云InfluxDB®提供的实例规格和管理帐号的创建。
image
当前,阿里云InfluxDB®大致提供2C8G/4C16G/8C32G/16C64G/32C128G/64C256G等大致6种规格,每种规格的读写能力参考如上图所示。阿里云InfluxDB®开放了开源版的几乎全部功能,用户可以在控制台创建管理员帐号,该帐号可以通过客户端和SDK进行所有的操作。
image

Writing Data From Spark

Spark是目前大数据处理领域中最流行、最高效的开源工具,通过spark structured streaming写数据到InfluxDB的开源适配器主要有chroniclerreactive-influx。chronicler与reactive-influx的区别是,在写入数据点之前,chronicler必须要将数据格式转换成influxdb行协议,在处理大量field字段和字符串值时会变得相当棘手,相较而言reactive-influx比较方便。
在sbt项目中引入reactive-influx:

libraryDependencies ++= Seq(
"com.pygmalios" % "reactiveinflux-spark_2.11" % "1.4.0.10.0.5.1",
"com.typesafe.netty" % "netty-http-pipelining" % "1.1.4"
)

InfluxDB entry 配置,其中阿里云InfluxDB®内网和公网的URL可以在控制台上找到:

reactiveinflux {
  url = "ts-xxxxx.influxdata.rds.aliyuncs.com:3242/"
  spark {
    batchSize = 1000 // No of records to be send in each batch
  }
}

扩展spark foreach writer, enable spark stuctured streaming 向阿里云InfluxDB®写数据的伪代码如下:

import com.pygmalios.reactiveinflux._
import com.pygmalios.reactiveinflux.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.DateTime
import com.pygmalios.reactiveinflux.{ReactiveInfluxConfig, ReactiveInfluxDbName}
import com.pygmalios.reactiveinflux.sync.{SyncReactiveInflux, SyncReactiveInfluxDb}
import scala.concurrent.duration._
class influxDBSink(dbName: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
        
    var  db:SyncReactiveInfluxDb = _
    implicit val awaitAtMost = 1.second
        
    // Define the database connection here
    def open(partitionId: Long, version: Long): Boolean = {
    val syncReactiveInflux =
    SyncReactiveInflux(ReactiveInfluxConfig(None))
    db = syncReactiveInflux.database(dbName);
    db.create() // create the database 
    true
  }
  
    // Write the process logic, and database commit code here
    def process(value: org.apache.spark.sql.Row): Unit = {
    val point = Point(
      time = time,  // system or event time 
      measurement = "measurement1",
      tags = Map(
        "t1" -> "A", 
        "t2" -> "B"
      ),
      fields = Map(
        "f1" -> 10.3, // BigDecimal field
        "f2" -> "x",  // String field
        "f3" -> -1,   // Long field
        "f4" -> true) // Boolean field
    )
    
    db.write(point)
  }
  
  // Close connection here
  def close(errorOrNull: Throwable): Unit = {
  }
}

引入Writer:

val influxWriter = new influxDBSink("dbName")
val influxQuery = ifIndicatorData
                                    .writeStream
                                    .foreach(influxWriter)
                                    .outputMode("append")
                                    .start()

可视化

数据写入InfluxDB之后,便可以利用各种工具进行数据可视化,如Grafana,Chronograf等。一个简单的可视化展示如下:
image
当前阿里云InfluxDB®已经自带Grafana数据可视化,用户只需要在控制台一键开通既可,具体可以参考<5分钟快速完成监控系统搭建之实践篇>

总结

目前InfluxDB已经在阿里云完全托管,被用户广泛使用。随着商业化时间的发展,我们在提高稳定性和性能的同时,功能也一步步丰富起来。当前已经提供了TIG(Telegraf/InfluxDB/Grafana)生态,下一步将完全兼容TICK(Telegraf/InfluxDB/Chorograf/Kapacitor)生态。覆盖的业务场景包括DevOps监控、车联网、智慧交通、金融和IOT传感器数据采集,欢迎大家试用并提供意见。
阿里云InfluxDB®为用户提供7*24小时服务,欢迎加入下面的钉钉群咨询。
image

参考文献

  1. Processing Time Series Data in Real-Time with InfluxDB and Structured Streaming
  2. 阿里云InfluxDB®教你玩转A股数据
  3. chronicler-spark
  4. reactiveinflux
(en)

阿里云优惠新机+优惠券

本文转载自网络,如有侵权,请联系我们删除。

Home

About

product

success

news

form

bbs

contact

工单(en)

阿里云报价咨询(en)