Monitoreando logs de zimbra con ELK stack y grafiado en grafana. 2da parte

Este artículo es la continuación del tutorial Instalar ELK Stack (Elasticsearch, Logstash y Kibana) para el monitoreo de Logs. 1era Parte escrito por Alexander(AKA @Alexminator) y podemos considerarlo como una de las partes del mismo. Alexander(AKA @Alexminator) solo trato el proceso de instalación y como tunear el elasticsearch para un mejor rendimiento del mismo.

Solo usaremos kibana para algunas cosas, tales como aplicacion de plantillas y para ver que toda la generación de los índices esta poniendo las etiquetas a los logs capturados desde los equipos remotos. La generación de los Dashboards la haremos en Grafana que en un artículo anterior de este blog vimos ya su proceso de instalación pero para recoger las métricas de centreon.
El escenario es el siguiente. Vamos a enviar los logs de zimbra a logstash y este se encargará de filtrarlo, etiquetear los elementos del logs necesarios y almacenar en los indices de elasticsearch.
Entonces manos a la obra. Lo primero será descargarnos el paquete filebeat el cual se instala en el equipo del cual queremos procesar los logs en este caso nuestro server de correo Zimbra.
Como bien explica Alexander(AKA @Alexminator) en su artículo no esta permitido para nuestro país de todas maneras doy el enlace de descarga para que lo hagan por las vias posibles.

# wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-5.4.1-amd64.deb

Ya descargado lo instalamos en el servidor zimbra.

#dpkg -i filebeat-5.4.1-amd64.deb

Ya lo tenemos instalado, luego regresaremos para configurarlo.
Vamos ahora ir al servidor de elasticsearch para configurar logstash que es lo más importante como decíamos anteriormente.
Vamos a crear el certificado para que la comunicación entre los equipos remotos con logstash sea segura.

#cd /etc/logstash/
#openssl req -subj /CN=equipo.dominio.cu -x509 -days 3650 -batch -nodes -newkey rsa:4096 -keyout logstash.key -out logstash.crt
#cd /root

donde equipo.dominio.cu es el equipo donde tenemos el logstash instalado.
Como vamos a procesar los logs de zimbra nos descargaremos los patrones y filtros que ya estan definidos para el y le haremos alguna que otra modificación. Instalamos git para clonar el repositorio de archivos

#apt-get install git

y seguidamente lo clonamos

#git clone https://github.com/ITLinuxCL/zimbra_logstash

Veremos que se crea una carpeta llamada zimbra_logstash y dentro estaran ya los  filtros y patrones que vamos a utilizar en nuestro procesamiento de nuestros logs de zimbra.
Antes vamos a crear dos ficheros importantes que son el input y el output. Debemos tener en cuenta que los ficheros Input, Filters y Output se ubican en la carpeta /etc/logstash/conf.d

#nano /etc/logstash/conf.d/filebeat-input.conf

con el contenido:

input {
  beats {
    port => 5443
    type => syslog
    ssl => true
    ssl_certificate => "/etc/logstash/logstash.crt"
    ssl_key => "/etc/logstash/logstash.key"
  }
}

Como vemos ponemos a escuchar el logstash por el puerto 5443 y los logs de tipo syslogs con la configuración de ssl de certificado y llave respectivamente para su comunicación segura desde los equipos remotos.
Crearemos también el fichero de filtro para syslog de la siguiente manera:

#nano /etc/logstash/conf.d/syslog-filter.conf

Conteniendo lo siguiente

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
}

Lo que esta en negrilla es plugin llamado grok para hacer el parseo de los ficheros.

Ahora crearemos el fichero output de logstash para su salida a elasticsearch, donde crea el indice.

#nano /etc/logstash/conf.d/output-elasticsearch.conf

Con el contenido:

output {
if [host] == "mail" {
   elasticsearch {
    hosts => ["localhost:9200"]
    hosts => "localhost:9200"
    manage_template => false
    index => "correo-%{[@metadata][beat]}-%{+YYYY.MM.dd}"
    document_type => "%{[@metadata][type]}"
   }
 }
}

Ponemos la condicional en el host para más adelante en los artículos para los logs de squid, nginx y pfsense poder tener separados los índices por cada host en cuestión ya que sería muy engorroso tener todos los parámetros a medir de cada uno de ellos en un solo indice.
Seguimos preparando el terreno. Vamos a preparar logstash para almacenar la geolocalización IP y más adelante le mostraremos cómo crear un geo-mapeado visual de las direcciones IP basado en los datos procesados, utilizando una base de datos GeoIP con Elasticsearch, Logstash, Kibana y Grafana. Lo primero que debemos hacer es descargarnos la base datos.
Creamos el directorio donde alojaremos la base datos

#mkdir /etc/logstash/geoip
#cd /etc/logstash/geoip

y descargamos la base de datos

#wget -t0 -c http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.tar.gz

Lo descompactamos

#tar -xf GeoLite2-City.tar.gz

Y movemos el fichero de la carpeta GeoLite2-City_(day) para /etc/logstash/geoip donde “day” es la fecha de la base en que se actualizo la base de datos.

#mv /etc/logstash/geoip/GeoLite2-City_(day)/GeoLite2-City.mmdb /etc/logstash/geoip

Ya tenemos la base de datos de geolocalización lista.
Vamos ahora a copiar los filtros necesarios de zimbra y los patrones de los cuales se van a auxiliar con el plugin grok para a partir de los mismos para hacer el parseo de los logs de zimbra.
Vamos a la carpeta donde clonamos con el git el repositorio de input, filtros, patrones y outputs.

#cd /root/zimbra_logstash/conf.d
#cp 10-filter-postfix.conf 15-filter-sasl.conf 16-filter-amavis.conf 17-filter-nginx.conf 18-filter-zimbra.conf 19-filter-mutate.conf etc/logstash/conf.d

Copiamos la carpeta donde vienen definidos los patrones para la configuración de logstash.

cp -R /root/zimbra_logstash/patterns /etc/logstash

Ahora modificaremos el fichero 10-filter-postfix.conf agregando estas lineas para usar la geolocalización de origen y destino de los mensajes de correo. Sujerencia para no equivocarse copiar lo siguente antes del ultimo corchete.

if [src_relayip] {
    geoip {
      source => "src_relayip"
      target => "geoip"
      database => "/etc/logstash/geoip/GeoLite2-City.mmdb"
      fields => [ "country_code2", "country_name" ]
      fields => [ "latitude", "longitude" ]
      add_field => [ "[geoip][location]", "%{[geoip][longitude]}" ]
      add_field => [ "[geoip][location]", "%{[geoip][latitude]}" ]
      add_tag => "geoip"
    }
     mutate {
        convert => [ "[geoip][location]", "float"]
     }
   }
 if [dst_relayip] {
    geoip {
      source => "dst_relayip"
      target => "geoip"
      database => "/etc/logstash/geoip/GeoLite2-City.mmdb"
      fields => [ "country_code2", "country_name" ]
      fields => [ "latitude", "longitude" ]
      add_field => [ "[geoip][location]", "%{[geoip][longitude]}" ]
      add_field => [ "[geoip][location]", "%{[geoip][latitude]}" ]
      add_tag => "geoip"
    }
     mutate {
        convert => [ "[geoip][location]", "float"]
     }
   }

Ya casi tenemos todo listo pero nos falta aún pasarle la plantilla al elasticsearch, para el acomodo de algunos tipos de datos del índice. Como el de geoip para poder hacer bien los mapas de geolocalización. Esta plantilla la podemos pasar mediante un archivo desde consola o mediante la web del kibana. Vamos a ver las dos variantes ustedes elegirán la que más cómoda le sea.
Variante consola.

#mkdir /root/templates/
#cd /root/templates/
#nano correo-filebeat.json

Con el siguiente contenido:

{
  "mappings": {
    "_default_": {
      "_all": {
        "enabled": true,
        "norms": false
      },
      "dynamic_templates": [
        {
          "template1": {
            "mapping": {
              "doc_values": true,
              "ignore_above": 1024,
              "index": "not_analyzed",
              "type": "{dynamic_type}"
            },
            "match": "*"
          }
        }
      ],
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "message": {
          "type": "text",
          "index": "true"
        },
        "offset": {
          "type": "long",
          "doc_values": "true"
        },
      "port": {
          "type": "integer",
          "index": "false"
      },
      "tls_info": {
          "type": "text",
          "index": "false"
      },
      "geoip" : {
        "dynamic" : true,
        "type" : "object",
        "properties" : {
          "ip" : {
            "type" : "ip",
            "doc_values" : true
          },
          "latitude" : {
            "type" : "float",
            "doc_values" : true
          },
          "location" : {
            "type" : "geo_point",
            "doc_values" : true
          },
          "longitude" : {
            "type" : "float",
            "doc_values" : true
          },
          "country_code2" : {
            "type" : "text",
            "index": "false"
          },
          "country_name" : {
            "type" : "text",
            "index": "false"
          }
        }
      }
      }
    }
  },
  "settings": {
    "index.refresh_interval": "5s"
  },
  "template": "correo-filebeat-*"
}

Ahora podemos pasarle la plantilla a elasticsearch de la siguiente manera:

#curl -XPUT "http://localhost:9200/_template/correo-filebeat?pretty" -d @correo-filebeat.json

Si devuelve lo siguiente es que todo fue bien

{
  "acknowledged" : true
}

Mediante la web de Kibana vamos a ir a Dev Tools y en la consola ponemos lo siguiente:

PUT _template/correo-filebeat
{
  "mappings": {
    "_default_": {
      "_all": {
        "enabled": true,
        "norms": false
      },
      "dynamic_templates": [
        {
          "template1": {
            "mapping": {
              "doc_values": true,
              "ignore_above": 1024,
              "index": "not_analyzed",
              "type": "{dynamic_type}"
            },
            "match": "*"
          }
        }
      ],
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "message": {
          "type": "text",
          "index": "true"
        },
        "offset": {
          "type": "long",
          "doc_values": "true"
        },
      "port": {
          "type": "integer",
          "index": "false"
      },
      "tls_info": {
          "type": "text",
          "index": "false"
      },
      "geoip" : {
        "dynamic" : true,
        "type" : "object",
        "properties" : {
          "ip" : {
            "type" : "ip",
            "doc_values" : true
          },
          "latitude" : {
            "type" : "float",
            "doc_values" : true
          },
          "location" : {
            "type" : "geo_point",
            "doc_values" : true
          },
          "longitude" : {
            "type" : "float",
            "doc_values" : true
          },
          "country_code2" : {
            "type" : "text",
            "index": "false"
          },
          "country_name" : {
            "type" : "text",
            "index": "false"
          }
        }
      }
      }
    }
  },
  "settings": {
    "index.refresh_interval": "5s"
  },
  "template": "correo-filebeat-*"
}

Le damos ejecutar y a la derecha nos saldrá si todo esta bien lo mismo que por la consola de comandos.

{
  "acknowledged" : true
}


Reiniciamos el servicio logstash para que adquiera la nueva configuración

#service logstash restart

Verificamos inició bien.

#tail -100f /var/log/logstash/logstash-plain.log

[2017-06-07T21:00:02,920][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Ahora estamos listos para configurar el filebeat en el equipo remoto o sea el servidor de correo zimbra.
En el servidor de correo editamos el fichero filebeat.yml.
Como generamos para logstash un certificado debemos copiarselo también al servidor zimbra lo pondremos en la misma carpeta de filebeat en /etc/filebeat/ el fichero logstash.crt. Esto garantiza que la comunicación sea segura entre filebeat de zimbra y logstash como hemos mencionado anteriormente.

#nano /etc/filebeat/filebeat.yml

En los prospectors editamos de la siguente manera

#=========================== Filebeat prospectors =============================

filebeat.prospectors:

# Each - is a prospector. Most options can be set at the prospector level, so
# you can use different prospectors for various configurations.
# Below are the prospector specific configurations.

- input_type: log
  document_type: postfix
  paths:
    - /var/log/mail.log
#    - /var/log/zimbra.log
- input_type: log
  document_type: zimbra_audit
  paths:
    - /opt/zimbra/log/audit.log
- input_type: log
  document_type: zimbra_mailbox
  paths:
    - /opt/zimbra/log/zmmailboxd.out
- input_type: log
  document_type: nginx
  paths:
    - /opt/zimbra/log/nginx.access.log

Y en Logstash output se define a que servidor logstash le va a entregar el filebeat los logs de zimbra

#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["equipo.dominio.cu:5443"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  bulk_max_size: 2048
  ssl.certificate_authorities: ["/etc/filebeat/logstash.crt"]
  template.name: "filebeat"
  template.path: "filebeat.template.json"
  template.overwrite: false
  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

Riniciamos el servicio de filebeat

#service filebeat restart

Verificamos el estado del servicio

#service filebeat status

● filebeat.service - filebeat
   Loaded: loaded (/lib/systemd/system/filebeat.service; disabled; vendor preset: enabled)
   Active: active (running) since vie 2017-06-16 21:25:19 CDT; 2 days ago
     Docs: https://www.elastic.co/guide/en/beats/filebeat/current/index.html
 Main PID: 30975 (filebeat)
   CGroup: /system.slice/filebeat.service
           └─30975 /usr/share/filebeat/bin/filebeat -c /etc/filebeat/filebeat.yml -path.home /usr/share/filebeat -path.config /etc/filebeat -path.data /var/lib/filebeat -path.logs /var/log/filebeat

Warning: Journal has been rotated since unit was started. Log output is incomplete or unavailable.
Ya estamos listos para obtener nuestros primeros logs. En Kibana vamos a Management/Index Patterns y presionamos el signo más(+). Como el patron en el output es correo-filebeat-fecha visto anteriormente en la salida ponemos correo-filebeat-* ya que los indices se generan diariamente y el uso del wildcard “*” nos permitirá estar leyendo los indices que se generan de manera diaria.

Presionamos el boton Create y ya veremos que empiezan a aparecer etiquetas definidas en los filter con el plugin grok.

Vamos a Discover y veremos ya como fluyen nuestros logs. A nuestra izquierda veremos las etiquetas que generaron los filtros y a la derecha veremos la captura del mensaje de log con las etiquetas marcadas.

Me he decidido por Grafana para generar los Dashboards porque a mi juicio, esta mucho más completo en este sentido y solo he usado Kibana para las operaciones que mencioné al inicio de este artículo.
Teniendo grafana ya funcional vamos a configurar el fichero de elasticsearch para que permita conexiones a los indices desde otra dirección ip y no solo desde localhost.

#nano /etc/elasticsearch/elasticsearch.yml

Añadimos al final del fichero lo siguiente:

http.host: 0.0.0.0
http.cors.allow-origin: "*"
http.cors.enabled: true

Reiniciamos el servicio de elasticsearch.

#service elasticsearch restart

Ahora podremos agregar a Grafana el Data Source de elasticsearch.

Marcamos añadir origen de datos (+add data source). Nombramos el data source, elegimos el tipo en este caso elasticsearch, la url del servidor elasticsearch con el puerto por defecto 9200, Access tipo proxy, El nombre del indice que es el formato del nombre del indice generado por logstash [correo-filebeat-]YYYY.MM.DD con el patron diario (Daily) y finalmente elegimos la versión de elasticsearch que en nuestro caso es 5.x.

Salvamos la configuración y probamos.

Ya podemos crear nuestro primer Dashboard.

Decidimos crear un gráfico

Editamos el panel.

En el panel del origen de datos(panel data source) elegimos correo-filebeat como llamamos al datasource y seleccionamos “+Add query” y ya veremos datos en la gráfica.

Podemos instalar plugins muy interesantes en Grafana que nos ayudaran a crear cada vez mejores Dashboards.

Vamos a ver varios ejemplos de plugins.

Worldmap Panel

Vamos a editarlo

Agregamos el panel de origen de datos en este caso la conexión que establecimos a elasticsearch

Como anteriormente en la plantilla que le establecimos elasticsearch indicabamos el campo geoip.location como de tipo geoip y en el filtro de postfix para las ip de origen y destino almacenamos la localización de las mismas en este campo vamos a ver como se definen las consultas en este tipo de plugin.

Ahora vamos a las opciones del mapa y establecemos los parametros del nombre del campo a mostrar y del campo geo_point en el mapa.

Entonces ya veremos los puntos en el mapa

Table Panel

De la misma forma agregamos una fila al dashboard (+ADD ROW) y seleccionamos el plugin Table y lo editamos. Seleccionamos como explicamos el panel de datos elasticsearch que definimos en el origen de datos.

Por defecto la metrica va ser de tipo Count aunque hay variedad de ellas. Vamos a agrupar por termino y en este caso elegiremos to_domain una etiqueta definida en nuestros filtros de logstash. Le diremos que cuente dos 10 primeros.

Definiremos ahora las columnas porque como vemos viene en formato JSON.

Y tambien podemos definir el formato de las columnas

Pie Chart

Vamos a establecer el termino que vamos a mostrar en el pastel. En este caso el campo result.

En la pestaña options debinimos el tipo de datos en Unit y el Valor que en este caso será Total.

Vamos a ver los templates ahora muy útiles en los Dashboards. En las opciones del Dashboard podemos agregar los templates.

Creamos un nuevo template.

Como vemos definimos el nombre del template en este caso src_domain de tipo query especificando el data source definido correo-filebeat y definimos el query buscar el termino from_domain de la siguente manera {“find”: “terms”, “field”: “from_domain”}. Tambien definimos multi-value para que nos permita selección multiple y la opción All.

Ahora tendremos que acotar las queries cada elemento de los Dashboards para que el template cumpla su función.

Esto se lo hacemos a cada uno de los Dashboard.

Aqui vemos el resultado:

 

Teniendo estos plugins podremos lograr cosas muy intersantes. Espero les sirva.

2 Comments

Dejar una contestacion

Tu dirección de correo electrónico no será publicada.


*