search(9)- elastic4s logback-appender

前面写了个cassandra-appender,一个基于cassandra的logback插件。正是cassandra的分布式数据库属性才合适作为akka-cluster-sharding分布式应用的logger。以是,cassandra-appender焦点功效就是对logback新闻的存写部门了。同样,基于ES的logback-appender焦点部门就是对ES的存写历程了。在ES里这个历程还附带了索引indexing历程。未来对历史新闻的搜索、分析会加倍利便。直接看看新闻存写这部门elastic4代码:

  def writeLog(event: ILoggingEvent)(client: ElasticClient, idx: String)(appName: String, ip: String, hostName: String, default: String) = { var content: List[(String,Any)] = List( APP_NAME -> appName, HOST_IP -> ip, HOST_NAME -> hostName, LOGGER_NAME -> event.getLoggerName(), LEVEL -> event.getLevel().toString, THREAD_NAME -> event.getThreadName(), LOG_DATE -> logDate, LOG_TIME -> logTime ) try { val callerData = event.getCallerData() if (callerData.nonEmpty) { content = content ++ List( CLASS_NAME -> callerData.head.getClassName(), FILE_NAME -> callerData.head.getFileName(), LINE_NUMBER -> callerData.head.getLineNumber().toString, METHOD_NAME -> callerData.head.getMethodName() ) } } catch {case e: Throwable => println(s"logging event error: ${e.getMessage}")} try { if (event.getThrowableProxy() != null) { val throwableStrs = event.getThrowableProxy().getSuppressed().asInstanceOf[List[IThrowableProxy]] val throwableStr = throwableStrs.foldLeft("") { case (b, t) => b + "," + t.getMessage() } content = content :+ (THROWABLE_STR -> throwableStr) } } catch {case e: Throwable => println(s"logging event error: ${e.getMessage}")} var logmsgs = event.getMessage() try { val logMap = fromJson[Map[String,String]](logmsgs) logMap.foreach ( m =>  content = content :+ (m._1 -> m._2)) } catch { case e: Throwable => content = content :+ (MESSAGE -> logmsgs) try { val dftMap = fromJson[Map[String,String]](default) dftMap.foreach ( m =>  content = content :+ (m._1 -> m._2)) } catch { case e: Throwable => } } val newRecord = indexInto(idx) .fields( content ).createOnly(true) client.execute(newRecord) //.await
 }

可以看到,我们先判断了一下event.getMessage()新闻是否是json花样的:若是是准确的json花样,那么剖析成为字段名和字段值,否则就直接写入log_msg字段 + 一串默认的字段和值。干什么呢?要知道这个elastic-appender是一个通用的logback-plugin,是可以在任何软件中使用的。由于种种软件对运行状态跟踪目的、方式的要求差别,为了知足这些要求,那么通过用户自界说跟踪目的字段的方式应该是一个好的解决方案。从测试例子里可以明白:

  var loggedItems = Map[String,String]() loggedItems = loggedItems ++ Map( ("app_customer" -> "logback.com"), ("app_device" -> "9101"), ("log_msg" -> "specific message for elastic ...")) log.debug(toJson(loggedItems)) //logback.xml
    <appender name="elasticLogger" class="com.datatech.logback.ElasticAppender">
        <host>http://localhost</host>
        <port>9200</port>
        <appName>ESLoggerDemo</appName>
        <defaultFieldValues>{"app_customer":"中央书城","app_device":"9013"}</defaultFieldValues>
        <indexName>applog</indexName>
    </appender>

上面代码里界说了app_customer,app_device,log_msg这几个自界说字段和值。这样做的意思是:logback只界说了log.info(msg)里msg一个字段。若是存放在数据库里我们只能在msg一个字段里举行分类、查询了。但既然已经使用了数据库作为存储我们更希望用更多的字段来代表一条新闻,如用户号,机械号,店号等等。这样跟踪起来利便许多。以是,对于内部的用户可以要求把因应特殊需要分外增添的字段-值加密成json,然后通报给ElasticAppender去处置。对于应用中引用三方软件所发生的logback-msg,我们可没设施要求他们凭据这个花样来通报新闻,但仍然会存进ES,以是就用logback.xml中defaultFieldValaues界说的默认字段-值来填写这些分外的信息了。

这一篇我们主要讨论一下这个稀奇的elastic-appender,它的使用方法。那么先重复一下logback的事情原理:

首先认识一下logback:感受需要重点领会的logging运作焦点应该是新闻品级level的操作。新闻品级是指logback凭据差别的新闻品级来筛选需要纪录的新闻。logback支持下面几个新闻品级,凭据各自纪录动作覆盖面由弱到强排列,包罗: TRACE -> DEBUG -> INFO -> WARN -> ERROR 划分对应纪录函数 trace(msg),debug(msg),info(msg),warn(msg),error(msg) logback按新闻品级举行纪录筛选的规则如下: 假设纪录函数为p,某个class的新闻品级level为q:当p>=q时选择纪录新闻。换言之挪用函数error(msg)时logback会纪录所有品级新闻,反之trace(msg)只能纪录TRACE级别的新闻。logback手册中如下示意: TRACE DEBUG INFO WARN ERROR OFF trace() YES NO NO NO NO NO debug() YES YES NO NO NO NO info() YES YES YES NO NO NO warn() YES YES YES YES NO NO error() YES YES YES YES YES NO logback中每个类的默认新闻品级可以凭据类型继续树结构继续。当一个子类没有界说新闻品级时,它继续对上父类的新闻品级,即:X.Y.Z中Z的默认新闻品级从Y继续。

再看看下面logback.xml例子:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <Pattern>
                %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n </Pattern>
        </encoder>
    </appender>

    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
        <!-- path to your log file, where you want to store logs -->
        <file>~/logback.log</file>
        <append>false</append>
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="cassandraLogger" class="com.datatech.logback.CassandraAppender">
        <appName>POCServer</appName>
        <defaultFieldValues>{"app_customer":"999999","app_device":"9999"}</defaultFieldValues>
        <keyspaceName>applog</keyspaceName>
        <columnFamily>txnlog</columnFamily>
    </appender>

    <appender name="elasticLogger" class="com.datatech.logback.ElasticAppender">
        <host>http://localhost</host>
        <port>9200</port>
        <appName>ESLoggerDemo</appName>
        <defaultFieldValues>{"app_customer":"中央书城","app_device":"9013"}</defaultFieldValues>
        <indexName>applog</indexName>
    </appender>

    <logger name="com.datatech" level="info" additivity="false">
        <appender-ref ref="cassandraLogger" />
        <appender-ref ref="elasticLogger" />
        <appender-ref ref="STDOUT" />
    </logger>

    <logger name="com.datatech.sdp" level="info" additivity="false">
        <appender-ref ref="cassandraLogger" />
        <appender-ref ref="elasticLogger" />
        <appender-ref ref="STDOUT" />
    </logger>

    <root level="info">
        <appender-ref ref="cassandraLogger" />
        <appender-ref ref="elasticLogger" />
        <appender-ref ref="STDOUT" />
    </root>

    <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
</configuration>

上面配置文件中界说了包罗STDOUT,FILE,cassandraLoggeer,elasticLogger几个appender。首先,差别level可以使用差别的appender。cassandraLogger,elasticLogger是我们自界说的appender。在elasticLogger段落里界说了ES终端毗邻参数如host,port。在ElasticAppender类源码中的elastic终端毗邻和关闭如下:

70行实现Promise核心源码

override def start(): Unit = { if(! _hosts.isEmpty) { connectES() super.start() } } override def stop(): Unit = { if(optESClient.isDefined) { (optESClient.get).close() optESClient = None } super.stop() } def connectES(): Unit = { try { val url = _hosts + ":" + _port.toString val esjava = JavaClient(ElasticProperties(url)) val client = ElasticClient(esjava) optESClient = Some(client) } catch { case e: Throwable => optESClient = None } }

注重,如果host在logback.xml里界说了那么在ElasticAppender实例化时系统会自动直接毗邻,否则需要手工挪用logger.start()来毗邻ES。xml文件里的属性是通过getter来获取的,如下:

 private var _hosts: String = ""
  def setHost(host: String): Unit = _hosts = host
  def getHost : String = _hosts

  private var _port: Int = 9200
  def setPort(port: Int): Unit = _port = port

  private var _idxname: String = "applog"
  def setIndexName(indexName: String): Unit = _idxname = indexName

  private var _username: String = ""
  def setUsername(username: String): Unit = _username = username

  private var _password: String = ""
  def setPassword(password: String): Unit = _password = password

  private var _defaultFieldValues: String = ""
  def setDefaultFieldValues(defaultFieldValues: String) = _defaultFieldValues = defaultFieldValues

下面是ElasticAppender的使用树模:(先把logback_persist.jar放入lib目录)

import scala.concurrent.ExecutionContext.Implicits.global import com.sksamuel.elastic4s.ElasticDsl._ import com.sksamuel.elastic4s.http.JavaClient import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} import ch.qos.logback.classic.Logger import ch.qos.logback.core.{ConsoleAppender, FileAppender} import com.datatech.logback.{CassandraAppender,ElasticAppender, JsonConverter} import ch.qos.logback.classic.spi.ILoggingEvent import org.slf4j.LoggerFactory import ch.qos.logback.classic.LoggerContext import java.time._ import java.time.format._ import java.util.Locale object ElasticAppenderDemo extends App with JsonConverter { val log: Logger = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME).asInstanceOf[Logger] val elasticAppender = log.getAppender("elasticLogger").asInstanceOf[ElasticAppender] val stdoutAppender = log.getAppender("STDOUT").asInstanceOf[ConsoleAppender[ILoggingEvent]] val fileAppender = log.getAppender("FILE").asInstanceOf[FileAppender[ILoggingEvent]] val cassAppender = log.getAppender("cassandraLogger").asInstanceOf[CassandraAppender] //stop other appenders
  if (stdoutAppender != null) stdoutAppender.stop() if (fileAppender != null) fileAppender.stop() if (cassAppender != null) cassAppender.stop() //check if host not set in logback.xml
  if(elasticAppender != null) { if (elasticAppender.getHost.isEmpty) { elasticAppender.setHost("http://localhost") elasticAppender.setPort(9200) elasticAppender.start() } } val dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS", Locale.US) val now = LocalDateTime.now.format(dateTimeFormatter) (1 to 100).foreach { idx => log.info(s"************this is a info message $idx ") } log.debug("***********debugging message here ..." + now) log.debug(toJson(loggedItems)) //stop the logger
 val loggerContext = LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext] loggerContext.stop() }

在Appender实例化时getAppender(“elasticLogger”)中这个elasticLogger是xml文件中appender段落名称。若是host,port没在xml文件中界说的话可以手工用setter setHost,setPort在程序里设置。loggerContext.stop()一次性关闭所有appender,包罗它们毗邻的数据库。也可以用elasticAppender.stop()来关闭自力的appender。

我们可以用elastic4自界说一个表结构mapping, 如下:

    val esjava = JavaClient(ElasticProperties("http://localhost:9200")) val client = ElasticClient(esjava) //删除索引
    val rspExists = client.execute(indexExists("applog")).await
    if (rspExists.result.exists) client.execute(deleteIndex("applog")).await

    //构建索引
    val idxCreate = client.execute(createIndex("applog") .shards(1).replicas(1)).await
    //建立表结构
    if(idxCreate.isSuccess) { val applogMapping = client.execute( putMapping("applog").fields( textField("class_name"), textField("file_name"), ipField("host_ip"), textField("host_name"), keywordField("level"), keywordField("line_number"), keywordField("logger_name"), keywordField("method_name"), keywordField("thread_name"), textField("throwable_str_rep"), dateField("log_date").format("basic_date").ignoreMalformed(true), dateField("log_time").format("basic_date_time").ignoreMalformed(true), textField("log_msg"), keywordField("app_name"), keywordField("app_customer"), keywordField("app_device") )).await
      if(applogMapping.isSuccess) println(s"mapping successfully created.") else println(s"mapping creation error: ${applogMapping.error.reason}") } else { println(s"index creation error: ${idxCreate.error.reason}") } client.close()

依赖引用在build.sbt里:

name := "logback-persist-demo" version := "0.1" scalaVersion := "2.12.9" val elastic4sVersion = "7.6.0" libraryDependencies ++= Seq( "com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0", "com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0", "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion, // for the default http client
  "com.sksamuel.elastic4s" %% "elastic4s-client-esjava" % elastic4sVersion, "ch.qos.logback"  %  "logback-classic"   % "1.2.3", "org.typelevel" %% "cats-core" % "2.0.0-M1", "org.json4s" %% "json4s-native" % "3.6.1", "org.json4s" %% "json4s-jackson" % "3.6.7", "org.json4s" %% "json4s-ext" % "3.6.7" )

 

原创文章,作者:admin,如若转载,请注明出处:https://www.2lxm.com/archives/7784.html