Get to the Bucket – Part 2

Intro

I don’t want to manage an Elasticsearch cluster. It was one of the main reasons I joined CHAOSSEARCH. To be rid of the accidental complexity of ES, and help others do the same. But my job is as an SRE, and logs will be created, those logs need to be searched, and that data needs to be stored somewhere. So let’s hack on Filebeat to quickly get data into S3 so it can be indexed by CHAOSSEARCH, and try to avoid resorting to a fat client like Logstash or Fluentd.

Part two in a multi-part series.

Find part one here.

Part Two: Hacking Filebeat to push to S3

In this blog post, we hack on Filebeat to use S3 as an output. I’d be remiss not to thank the several projects / codebases I used as reference material and inspiration.

tl;dr: Hack Filebeat’s interval rotator to push logs to S3 before deleting them.

But before we get there…

On running support infrastructure

The point of what we’re doing at CHAOSSEARCH is to make storing and searching data cheaper in dollars and compute. We store on block storage (S3) because of price and reliability. We run containers because they are dense, flexible, and easy to deploy. We run docker swarm because it’s lightweight. We monitor with netdata and custom lambdas so we don’t have to dedicate a whole set of servers to monitoring our infrastructure. We dedicate as much of our IT spend on delivering the product as possible and avoid as much accidental service sprawl as we can.

Our core product makes S3 full text searchable. All of our support applications generate JSON logs and status data. That data is dumped into one of several S3 buckets where it is indexed for value add and observability. It’s an entire ecosystem built around S3, JSON, containers, Kibana, and server-less.

Why not a fat client?

How many containers do you run on a host? Maybe you don’t run containers, feel free to substitute processes, or VMs, or jails, or whatever you use to deploy code. How large are your instances? Remember that each host has a finite amount of ram, and every log / monitoring / support agent run per host uses compute and ram (read that as money). That is money not directly going towards providing service to the customer. Imagine it as “We pay 70 dollars a month for this instance, but we only get 56 dollars worth of work out of it because 20% of its resources go to logging and monitoring agents.” Obviously, there is some hyperbole in there — I’m not advocating no monitoring or logging agents, you can’t run a black box, but you should think about it. Think about the overhead of support agents and how it affects your deployments.

Let’s start with Logstash, a fine robust log shipper with a million plugins. Running it locally it weighs in at a portly 900 megs ram. Fluentd around 450. Those ram values are multiples of each other, which is convenient for the author. If you have 50 hosts, 50 * 900 = 45,000 megs of ram are dedicated to Logstash log shipping (or chop that in half if you’re using Fluentd). 45,000 megs of ram is fair bit. That’s ~45 gigs. That’s like 3 m5.xlarges worth of ram. 3 m5.xlarges is ~420 dollars a month to run. Can you think of something you’d rather use 420 bucks on in your AWS spend on? It may be completely worthwhile for your workload to use 900 megs an instance for log shipping, and I won’t dispute that. You know your workload, I do not. Maybe you run 100 containers per instance on fewer larger instances, and 900 megs is in the noise. Maybe it’s better to distribute log parsing across all your instances instead of a centralized place (for scalability). Maybe it just works, and you don’t want to touch it. But maybe it’s a waste of money to burn almost a gig of ram an instance, and maybe you want to allocate that spend to something else.

Here at CHAOS, we’re running containers on smaller instance types for better cost granularity, 900 megs per host isn’t feasible. 450 isn’t feasible either. The python script from the previous post weighed in at 50 megs. That was a great footprint, but not really up to snuff production reliability wise. We needed to split the difference and get something robust, with a limited ram footprint, that handles error cases handled for S3 write problems.

Enter Filebeat

Filebeat uses less ram than Logstash and Fluentd. Of course it uses less ram, it’s written in golang instead of JRuby. Underneath Filebeat a library called Libbeat. Libbeat powers the shipping and collection of a whole host of different tools that gather and ship log data, metrics, status, and pretty much any other data that can be turned into JSON objects and stored in Elasticsearch. Libbeat has a whole host of “outputs” it can ship data thru, but S3 is not one of them. Some light googling turned up Jeffery Hardy’s pull request trying to get an S3 output added to libbeat but that code was never merged.

The Filebeat hack

I knew I wanted to try to use Filebeat to ship logs, and Metricbeat to ship metrics, and I knew I didn’t want to run Logstash ingestors or kafta endpoints, or any extra servers if I could avoid it. Filebeat has a File output, which would rotate on size and keep n backups. I wondered what I would find in that file output code…

Log files based on time, not size

If you look at the rotator struct, the object created to write a file output to, it refers to interval rotation, but it’s impossible to set from the config file.

libbeat/common/file/rotator.go

// Rotator is a io.WriteCloser that automatically rotates the file it is
// writing to when it reaches a maximum size and optionally on a time interval
// basis. It also purges the oldest rotated files when the maximum number of
// backups is reached.
type Rotator struct {
        filename        string
        maxSizeBytes    uint
        maxBackups      uint
        permissions     os.FileMode
        log             Logger // Optional Logger (may be nil).
        interval        time.Duration
        intervalRotator *intervalRotator // Optional, may be nil
        redirectStderr  bool

        file  *os.File
        size  uint
        mutex sync.Mutex
}

If we manage to pass an interval when we initialize the output, the code exists to handle the creation of new time based files on write. Lets try it and test:

This two line change adds the time library, and pass 60 seconds as the value to interval.

diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go
index ab5b040be..f2c0a3505 100644
--- a/libbeat/outputs/fileout/file.go
+++ b/libbeat/outputs/fileout/file.go
@@ -28,6 +28,7 @@ import (
        "github.com/elastic/beats/libbeat/outputs"
        "github.com/elastic/beats/libbeat/outputs/codec"
        "github.com/elastic/beats/libbeat/publisher"
+       "time"
 )

 func init() {
@@ -85,6 +86,7 @@ func (out *fileOutput) init(beat beat.Info, c config) error {
                file.MaxBackups(c.NumberOfFiles),
                file.Permissions(os.FileMode(c.Permissions)),
                file.WithLogger(logp.NewLogger("rotator").
With(logp.Namespace("rotator"))),
+               file.Interval(time.Duration(60)*time.Second),
        )
        if err != nil {
                return err

Apply this patch set, cd into Filebeat, and run make filebeat. The following stripped down config, that take stdin, and outputs to temp.

filebeat.inputs:
- type: stdin

output.file:
  path: /tmp/filebeat_log
  filename: filebeat
  number_of_files: 7

Filebeat now creates JSON objects from input from standard in, and outputs them to /tmp/filebeat_log/filebeat. After one minute, the files are rotated with a timestamp, /tmp/filebeat_log/filebeat-2019-02-26-22-55-1, /tmp/filebeat_log/filebeat-2019-02-26-22-56-1, etc. Wonderful.

Hijacking file rotation

Filebeat is now rotating output files every 60 seconds. It has baked in support for only keeping N outputted files, and I’m betting we can piggy back on the rotation code and push the time-based files to S3.

This patch is much more involved than the rotator hack, but it is pretty straightforward.

diff --git a/libbeat/common/file/rotator.go b/libbeat/common/file/rotator.go
index fb88481fd..58f98ff53 100644
--- a/libbeat/common/file/rotator.go
+++ b/libbeat/common/file/rotator.go
@@ -25,6 +25,18 @@ import (
    "time"

    "github.com/pkg/errors"
+
+   "bytes"
+   "net/http"
+
+   "github.com/aws/aws-sdk-go/aws"
+   "github.com/aws/aws-sdk-go/aws/session"
+   "github.com/aws/aws-sdk-go/service/s3"
+)
+
+const (
+    S3_REGION = "us-east-1"
+    S3_BUCKET = "cs-pmf-test-bucket"
 )

 // MaxBackupsLimit is the upper bound on the number of backup files. Any values
@@ -139,7 +151,6 @@ func NewFileRotator(filename string, options ...RotatorOption) (*Rotator, error)
        maxSizeBytes: 10 * 1024 * 1024, // 10 MiB
        maxBackups:   7,
        permissions:  0600,
-       interval:     0,
    }

    for _, opt := range options {
@@ -309,24 +320,73 @@ func (r *Rotator) purgeOldBackups() error {
    return r.purgeOldSizedBackups()
 }

+//
+// added function
+//
+
+// AddFileToS3 will upload a single file to S3, it will require a pre-built aws session
+// and will set file info like content type and encryption on the uploaded file.
+func AddFileToS3(s *session.Session, fileDir string) error {
+
+    // Open the file for use
+    file, err := os.Open(fileDir)
+    if err != nil {
+        return err
+    }
+    defer file.Close()
+
+    // Get file size and read the file content into a buffer
+    fileInfo, _ := file.Stat()
+    var size int64 = fileInfo.Size()
+    buffer := make([]byte, size)
+    file.Read(buffer)
+
+    // Config settings: this is where you choose the bucket, filename, content-type etc.
+    // of the file you're uploading.
+    _, err = s3.New(s).PutObject(&s3.PutObjectInput{
+        Bucket:               aws.String(S3_BUCKET),
+        Key:                  aws.String(fileDir),
+        ACL:                  aws.String("private"),
+        Body:                 bytes.NewReader(buffer),
+        ContentLength:        aws.Int64(size),
+        ContentType:          aws.String(http.DetectContentType(buffer)),
+        ContentDisposition:   aws.String("attachment"),
+        ServerSideEncryption: aws.String("AES256"),
+    })
+    return err
+}
+
 func (r *Rotator) purgeOldIntervalBackups() error {
    files, err := filepath.Glob(r.filename + "*")
    if err != nil {
        return errors.Wrap(err, "failed to list existing logs during rotation")
    }

-   if len(files) > int(r.maxBackups) {
+   //if len(files) > int(r.maxBackups) {
+   if len(files) > 2 {

        // sort log filenames numerically
        r.intervalRotator.SortIntervalLogs(files)

-       for i := len(files) - int(r.maxBackups) - 1; i >= 0; i-- {
+       sesh, sesh_err := session.NewSession(&aws.Config{Region: aws.String(S3_REGION)})
+       if sesh_err != nil {
+               errors.Wrapf(sesh_err, "New s3 Session Error: %v", S3_REGION)
+       }
+
+       //for i := len(files) - int(r.maxBackups) - 1; i >= 0; i-- {
+       for i := len(files) - 1; i >= 0; i-- {
+
            f := files[i]
            _, err := os.Stat(f)
            switch {
            case err == nil:
-               if err = os.Remove(f); err != nil {
-                   return errors.Wrapf(err, "failed to delete %v during rotation", f)
+               s3err := AddFileToS3(sesh, f)
+               if s3err != nil {
+                       errors.Wrapf(s3err, "failed to upload %v during rotation", f)
+               } else {
+                   if err = os.Remove(f); err != nil {
+                       return errors.Wrapf(err, "failed to delete %v during rotation", f)
+                   }
                }
            case os.IsNotExist(err):
                return errors.Wrapf(err, "failed to delete non-existent %v during rotation", f)
@@ -340,14 +400,24 @@ func (r *Rotator) purgeOldIntervalBackups() error {
 }

 func (r *Rotator) purgeOldSizedBackups() error {
+   sesh, sesh_err := session.NewSession(&aws.Config{Region: aws.String(S3_REGION)})
+   if sesh_err != nil {
+           errors.Wrapf(sesh_err, "New s3 Session Error: %v", S3_REGION)
+   }
+
    for i := r.maxBackups; i < MaxBackupsLimit; i++ {
        name := r.backupName(i + 1)

        _, err := os.Stat(name)
        switch {
        case err == nil:
-           if err = os.Remove(name); err != nil {
-               return errors.Wrapf(err, "failed to delete %v during rotation", name)
+           s3err := AddFileToS3(sesh, name)
+           if s3err != nil {
+                   errors.Wrapf(s3err, "failed to upload %v during rotation", name)
+           } else {
+               if err = os.Remove(name); err != nil {
+                   return errors.Wrapf(err, "failed to delete %v during rotation", name)
+               }
            }
        case os.IsNotExist(err):
            return nil
  • Include some libraries for pushing to S3
  • Hardcode the S3 bucket and region Filebeat will push to
  • Add a function for pushing a file to S3 (h/t Edd Turtle)
  • Hack the purge time-based files, and purge the size based files functions to push to S3 before they delete the file. Hardcode it to rotate once there are more than two files in the output directory.

Running this monstrosity

The Amazon Go SDK will resolve credentials from environment variables to an ~/.aws/credentials to an IAM role. We are just piggy backing on that feature and not modifying the code that reads the Filebeat configuration. For testing, it’s using the testingprofile I have configured in~/.aws/credentials.

To get a real amount of data, I’m going to pull logs from my local docker containers. This Filebeat config will get all the docker logs, and add all the metadata about the host and containers it can:

  • Create fb-s3.yml
# fb-s3.yml
filebeat.autodiscover:
  providers:
    - type: docker
      templates:
        - condition:
            regexp:
              docker.container.name: ".*"
          config:
            - type: docker
              containers.ids:
                - "${data.docker.container.id}"
              processors:
                - add_docker_metadata: ~
                - add_host_metadata: ~
                - add_cloud_metadata: ~
              json.keys_under_root:   true
              json.add_error_key:     true
output.file:
  path: tmp/filebeat_log
  filename: filebeat
  number_of_files: 2
logging.level: info
  • Run it:
    export AWS_PROFILE=testing
    ./filebeat -c fb-s3.yml
  • Check S3 bucket:
    export AWS_PROFILE=testing
    aws s3 ls cs-pmf-test-bucket --recursive
2019-02-26 15:31:38      59465 tmp/filebeat_log/filebeat-2019-02-26-20-29-1
2019-02-26 15:31:38     120175 tmp/filebeat_log/filebeat-2019-02-26-20-30-1
2019-02-26 16:00:31      84450 tmp/filebeat_log/filebeat-2019-02-26-20-31-1
2019-02-26 16:00:31     117935 tmp/filebeat_log/filebeat-2019-02-26-20-32-1
2019-02-26 16:00:30     128020 tmp/filebeat_log/filebeat-2019-02-26-20-34-1
2019-02-26 17:57:07     127964 tmp/filebeat_log/filebeat-2019-02-26-21-00-1
2019-02-26 17:56:58     127896 tmp/filebeat_log/filebeat-2019-02-26-21-11-1
2019-02-26 17:56:52       7162 tmp/filebeat_log/filebeat-2019-02-26-22-50-1
2019-02-26 18:00:36      75666 tmp/filebeat_log/filebeat-2019-02-26-22-51-1
2019-02-26 18:00:34      79484 tmp/filebeat_log/filebeat-2019-02-26-22-53-1
2019-02-26 18:00:32      37650 tmp/filebeat_log/filebeat-2019-02-26-22-54-1

And it works. It works really well actually. There’s some weirdness to unpack, like reconciling the file timestamps with the upload timestamp. My guess is that has to do with logs rotation being evaluated on write, or maybe a slow uploads (or maybe application restarts). There’s more to do, but the point here was not to build the most wonderful robust log shipper on the planet, it was to prove out we could make Libbeat ship logs to S3. We managed to do that in less than a hundred lines of go.

The future of the Filebeat hack

This isn’t production level code, but it is a good POC. You could run it for local testing. To productize the changes, settings need to be moved to the config files, and the whole thing should become a beat module.

One discussion sparked from the POC is the current pattern of augmenting every logging line with meta-data. With container and host meta data enabled, a one line log message generated a 130 line JSON object. It seems awful wasteful to duplicate so much content in your logs but there is an upside to all that repeat data…

Log data compresses really well! Especially log data with a lot of redundant strings. Any attempts to use the code in production should probably include compression of some sort before pushing to S3. The fritzhardy patch set includes some code to to that, but it was omitted for brevity.

Compressibility

$ du -h filebeat
128K    filebeat
$ gzip filebeat
$ du -h filebeat.gz
12K filebeat.gz

Who’s going to productize this work? Probably us. Probably me specifically. Maybe I’m already fairly far along. There will be a git repo when it’s proved out and the code made a module. But in the meantime, we wanted to share out what we learned doing our POC.

Wrapping up

It was a pretty straightforward change, but pretty powerful. The Libbeat code base is really clean, a bit confusing, but easy to work with. It would have been wonderful for elastic to have accepted the fritzhardy patch when he offered it, but it was rejected. Their rational was they had enough complexity in their codebase and didn’t want to include another output. Since then upstream drifted enough that it wouldn’t apply cleanly anymore, and we had to hack this out instead.

Questions? Comments? Want to collaborate? Hit me up: @platformpatrick

Preview: In the next post, we’re going to talk about Lambdas and CloudWatch.