scala对复杂json的处理

摘要:
这段代码主要关注Flinkstream流cannal json的分析。经过多次实验,发现阿里巴巴的fastjson更好用。因此,此记录将在<dependency><groupId>com中引入依赖项。alibabafastjson1.2.33案例数据和json数据:{“

本次代码主要侧重为flink stream流解析cannal-json,经过多次实验,发现还是阿里的fastjson较为好用,故在此做记录

将依赖引入

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.33</version>
</dependency>

案例数据,json数据:
{
    "data": [
        {
            "name": "张三", 
            "age": "22", 
            "xb": ""
        }
    ], 
    "database": "test", 
    "es": 1623029629000, 
    "id": 1, 
    "isDdl": false, 
    "mysqlType": {
        "name": "varchar(500)", 
        "age": "int(11)", 
        "xb": "varchar(20)"
    }, 
    "old": null, 
    "pkNames": null, 
    "sql": "", 
    "sqlType": {
        "name": 12, 
        "age": 4, 
        "xb": 12
    }, 
    "table": "mysql_result1", 
    "ts": 1623029681690, 
    "type": "INSERT"
}
案例:本次将通过fink stream读取kafka中json数据,然后过滤出插入的数据,并且拿到数据,代码如下
package it.bigdata.flink.study.test

import java.util.Properties

import com.alibaba.fastjson.JSON
import it.bigdata.flink.study.test.entity.MysqlResult
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer


object SteamBinLog {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //3.从kafka中读取数据
    val props = new Properties()
    props.setProperty("bootstrap.servers","10.18.35.155:9092,10.18.35.156:9092,10.18.35.157:9092")
    //    props.setProperty("group.id","consumer-group")
    val stream3 = env.addSource(new FlinkKafkaConsumer[String]("test_mysql_result1", new SimpleStringSchema(), props)
    .setStartFromEarliest())
    val stream4 = stream3.map(data => {
      val json= JSON.parseObject(data)
      json
    })
        .filter(_.get("type").equals("INSERT"))
        .map(data=>{
          val json = JSON.parseObject(data.getJSONArray("data").get(0).toString)
          val my = new MysqlResult()
          my.setName(json.getString("name"))
          my.setAge(json.getIntValue("age"))
          my.setXb(json.getString("xb"))
          my
        })

    stream4.print()


    env.execute("stream binlog")
  }


}

下边是报错json的相关操作,复制的原为内容:https://segmentfault.com/a/1190000039415392

1.将数据转为json

import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization._
import org.json4s.jackson.Serialization
case class WOE(col: String, woe: Map[String, String])
implicit val formats = Serialization.formats(NoTypeHints)
val testMap = Map[String, String]()
testMap += ("1" -> "1.1")
val a = WOE("1", immutable.Map(testMap.toList:_*))
println(write(a)) 

输出{"col":"1","woe":{"1":"1.1"}}

2、解析json

implicit val formats = Serialization.formats(NoTypeHints)
val js =
"""
{"col":"1","woe":{"1":"1.1"}}
"""
val ab = parse(js).extract[WOE]
println(write(ab)) 

如果是List也可以

implicit val formats = Serialization.formats(NoTypeHints)

    val b = new ListBuffer[WOE]
    val testMap = Map[String, String]()
    testMap += ("1" -> "1.1")
    b += WOE("1", immutable.Map(testMap.toList:_*))
    b += WOE("3", immutable.Map(testMap.toList:_*))
    println(write(b))

val js =
      """
 [{"col":"1","woe":{"1":"1.1"}},{"col":"3","woe":{"1":"1.1"}}]
      """
    val ab = parse(js).extract[List[WOE]]
    println(ab.toString) 

1、scala自带的Json解析
scala 2.10(以上,其他版本不清楚)自带Json解析,scala.util.parsing.json.JSON
object转json

val testMap = Map[String, String]()
    testMap += ("1" -> "2.034")
    testMap += ("2" -> "2.0134")
    println(scala.util.parsing.json.JSONObject(scala.collection.immutable.Map(testMap.toList: _*))) 

但好像只能处理map,且map要转成immutable

2、fastjson

解析json

import com.alibaba.fastjson.JSON
object JsonDemo {
  def main(args: Array[String]) {
    val text = "{"name":"name1", "age":55}"
    val json = JSON.parseObject(text)
    println(json.get("name"))
    println(json.get("age"))
  }
} 

再例如

import com.alibaba.fastjson.JSON

object Json {
  def main(args: Array[String]): Unit = {
    val str2 = "{"et":"kanqiu_client_join","vtm":1435898329434,"body":{"client":"866963024862254","client_type":"android","room":"NBA_HOME","gid":"","type":"","roomid":""},"time":1435898329}"
       val json=JSON.parseObject(str2)
       //获取成员
       val fet=json.get("et")
       //返回字符串成员
       val etString=json.getString("et")
       //返回整形成员
       val vtm=json.getInteger("vtm")
       println(vtm)
       //返回多级成员
       val client=json.getJSONObject("body").get("client")
       println(client) 

在spark-steaming中,使用fast-json更加稳定,json-lib经常出现莫名问题,而且fastjson的解析速度更快.

object转json,首先必须要显式的定义参数,否则会报错

ambiguous reference to overloaded definition
1
例如:

val testMap = Map[String, String]()
testMap += ("1" -> "2.034")
testMap += ("2" -> "2.0134")
val a = JSON.toJSONString(testMap, true)
println(a) 

不会报错,但是输出结果是奇怪的

{
    "empty":false,
    "sizeMapDefined":false,
    "traversableAgain":true
} 

3、json4s

object转json

val testMap = Map[String, String]()
testMap += ("1" -> "2.034")
testMap += ("2" -> "2.0134")
val jj = compact(render(testMap))
println(jj) 

输出

[{"2":"2.0134"},{"1":"2.034"}]

如果都是String,复杂的Map结构也可以解析

val testMap = Map[String, Map[String, String]]()
val subMap = Map[String, String]()
subMap += ("1" -> "1.1")
testMap += ("1" -> subMap)
println(write(testMap)) 

输出{"1":{"1":"1.1"}}
但这样的形式不利于解析

再例如

implicit val formats = Serialization.formats(NoTypeHints)
val m = Map(
      "name" -> "john doe",
      "age" -> 18,
      "hasChild" -> true,
      "childs" -> List(
        Map("name" -> "dorothy", "age" -> 5, "hasChild" -> false),
        Map("name" -> "bill", "age" -> 8, "hasChild" -> false)))
    val mm = Map(
      "1" -> Map ("1"->"1.2")
    )
println(write(a)) 

TEST

package com.dfssi.dataplatform.analysis.exhaust.alarm

import java.sql.Timestamp
import java.util

import com.alibaba.fastjson.serializer.SerializerFeature
import org.apache.spark.Logging
import org.json4s.NoTypeHints

//将要解析得数据
case class NeedEntity(val vin: String,
                      val downoutput: Double,
                      val collectTime: Long,
                      val lon: Double,
                      val lat: Double,
                      val failureList: java.util.List[Integer] = new util.ArrayList[Integer]()
                     ) extends Serializable

//管理状态
//这是事件管理得 按照每个事件来处理

class OverLimitEvent(var vin: String,
                     var startTime: Long,
                     var startLon: Double,
                     var startLat: Double,
                     var eventType:String="overlimit",
                      var endTime: Long = 0,
                      var endLon: Double = 0.0,
                      var endLat: Double = 0.0,
                      var minValue: Double = 0.0,
                      var maxValue: Double = 0.0
                         ) extends Serializable with Logging{

  def getInsertMap(): Map[String, Any] = {
    Map(
      "vin" -> vin,
      "startTime" -> new Timestamp(startTime),
      "startLon" -> startLon,
      "startLat" -> startLat
    )
  }

  def getUpdateMap(): Map[String, Any] = {
    Map(
      "vin" -> vin,
      "startTime" -> new Timestamp(startTime),
      "endTime" -> new Timestamp(startTime),
      "endLon" -> startLon,
      "endLat" -> startLat,
      "maxValue" -> maxValue,
      "minValue" -> minValue
    )
  }

  def updateByEntity(entity: NeedEntity) = {
    this.endTime = entity.collectTime
    this.endLat = entity.lat
    this.endLon = entity.lon
    if (this.maxValue != null && this.maxValue < entity.downoutput) {
      this.maxValue = entity.downoutput
    }
    if (this.minValue != null && this.minValue > entity.downoutput) {
      this.minValue = entity.downoutput
    }

  }

  override  def toString(): String ={
    import org.json4s.jackson.Serialization._
    import org.json4s.jackson.Serialization
    implicit val formats = Serialization.formats(NoTypeHints)
    write(this)
  }

}

object OverLimitEvent {
  val ID_FIELD = Array("vin", "startTime")
  def apply(
             vin: String,
             startTime: Long,
             startLon: Double,
             startLat: Double,
             endTime: Long,
             endLon: Double,
             endLat: Double,
             minValue: Double,
             maxValue: Double
           ): OverLimitEvent = {
    val event = new OverLimitEvent(vin, startTime, startLon, startLat)
    event.endTime = endTime
    event.endLat = endLat
    event.endLon = endLon
    event.maxValue = maxValue
    event.minValue = minValue
    event
  }

  def buildByEntity(entity: NeedEntity): OverLimitEvent = {
    new OverLimitEvent(entity.vin, entity.collectTime, entity.lon, entity.lat)
  }

  def buildByJson(json: String): OverLimitEvent = {
    com.alibaba.fastjson.JSON.parseObject(json, classOf[OverLimitEvent])
  }

  override  def toString(): String ={
    import org.json4s.jackson.Serialization._
    import org.json4s.jackson.Serialization
    implicit val formats = Serialization.formats(NoTypeHints)
    write(this)
  }

}

case class ExhaustAlarmStatus(val vin: String, var overLimitEvent: OverLimitEvent=null,var faultEvent:Map[String,OverLimitEvent]=null, var lastTime: Long) {
  override  def toString(): String ={
    import org.json4s.jackson.Serialization._
    import org.json4s.jackson.Serialization
    implicit val formats = Serialization.formats(NoTypeHints)
    write(this)
  }
}

object ExhaustAlarmStatus {
  def buildByJson(json: String): ExhaustAlarmStatus = {
    if(json!=null){
        com.alibaba.fastjson.JSON.parseObject(json,
        classOf[ExhaustAlarmStatus])
    }else{
      null
    }
  }

  def toJSON(state: ExhaustAlarmStatus): String = com.alibaba.fastjson.JSON.toJSONString(state, SerializerFeature.PrettyFormat)

  def main(args: Array[String]): Unit = {
    val json = "{"vin":"222", "OverLimitEvent":{ "vin":"222",  "startTime":123456789, "startLon":1.0, "startLat":1.0, "endTime":123456789, "endLon":1.0, "endLat":1.0, "minValue":1.0, "maxValue":1.0 },"lastTime":1556441242000}";
    val state  = com.alibaba.fastjson.JSON.parseObject(json,
      classOf[ExhaustAlarmStatus])
    println(state.overLimitEvent)
    import org.json4s.JsonDSL._
    import org.json4s.jackson.JsonMethods._
    import org.json4s.jackson.Serialization._
    import org.json4s.jackson.Serialization
    implicit val formats = Serialization.formats(NoTypeHints)
    val jsonstr = write(state)
    println(jsonstr)
  }

}
 

免责声明:文章转载自《scala对复杂json的处理》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇vue中给input框赋值,无法修改的问题linux编程头文件所在路径的问题下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

JAVA 调用HTTP接口POST或GET实现方式(转)

HTTP是一个客户端和服务器端请求和应答的标准(TCP),客户端是终端用户,服务器端是网站。通过使用Web浏览器、网络爬虫或者其它的工具,客户端发起一个到服务器上指定端口(默认端口为80)的HTTP请求。 具体POST或GET实现代码如下: packagecom.yoodb.util; importjava.io.ByteArrayOutputStre...

程序自动更新版本

增加了程序自动更新版本的功能,实现方式如下: 后台数据库中用一张表来保存程序的版本信息,该表的字段很简单,如下: CREATE TABLE [dbo].[sys_AutoUpdate]( [UID] [int] IDENTITY(1,1) NOT NULL, [SystemName] [varchar](50) NULL, [SystemVers...

JSP上传视频等大文件

前言:因自己负责的项目(jetty内嵌启动的SpringMvc)中需要实现文件上传,而自己对java文件上传这一块未接触过,且对 Http 协议较模糊,故这次采用渐进的方式来学习文件上传的原理与实践。该博客重在实践。 一. Http协议原理简介      HTTP是一个属于应用层的面向对象的协议,由于其简捷、快速的方式,适用于分布式超媒体信息系统。它于19...

Java 生成指定时间范围的随机时间、随机中文姓名、随机字符姓名、随机数

解决问题: Java生成指定时间范围的随机时间? Java生成随机中文姓名? Java生成随机字符姓名? Java生成随机数? 代码: import java.io.UnsupportedEncodingException; import java.text.ParseException; import java.text.SimpleDateFormat...

request请求地址

1、String contextPath = httpServletRequest.getServletContext().getContextPath(); /项目名称 2、String contextPath2 = httpServletRequest.getContextPath(); /项目名称 3、String requestURI = http...

登陆验证前对用户名和密码加密之后传输数据---base64加密

以下这种方法是加密传输的简单实现 1,base64.js /** * * Base64 encode / decode * * */ function Base64() { // private property _keyStr = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefg...