Skip to content
This repository was archived by the owner on May 31, 2023. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 110 additions & 1 deletion logspout.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,47 @@ import (
"log"
"net"
"time"
"strconv"
"os"

"github.com/gliderlabs/logspout/router"
)

const defaultRetryCount = 10

var (
retryCount uint
econnResetErrStr string
)

func setRetryCount() {
if count, err := strconv.Atoi(getopt("RETRY_COUNT", strconv.Itoa(defaultRetryCount))); err != nil {
retryCount = uint(defaultRetryCount)
} else {
retryCount = uint(count)
}
debug("setting retryCount to:", retryCount)
}

func getopt(name, dfault string) string {
value := os.Getenv(name)
if value == "" {
value = dfault
}
return value
}

func debug(v ...interface{}) {
if os.Getenv("DEBUG") != "" {
log.Println(v...)
}
}

// FluentdAdapter is an adapter for streaming JSON to a fluentd collector.
type FluentdAdapter struct {
conn net.Conn
route *router.Route
transport router.AdapterTransport
}

// Stream handles a stream of messages from Logspout. Implements router.logAdapter.
Expand Down Expand Up @@ -41,9 +74,83 @@ func (adapter *FluentdAdapter) Stream(logstream chan *router.Message) {
_, err = adapter.conn.Write(json)
if err != nil {
log.Println("fluentd-adapter: ", err)
continue
adapter.retry(json, err)
}
}
}

func (adapter *FluentdAdapter) retry(json []uint8, err error) error {
if opError, ok := err.(*net.OpError); ok {
if (opError.Temporary() && opError.Err.Error() != econnResetErrStr) || opError.Timeout() {
retryErr := adapter.retryTemporary(json)
if retryErr == nil {
return nil
}
}
}
if reconnErr := adapter.reconnect(); reconnErr != nil {
return reconnErr
}
if _, err = adapter.conn.Write(json); err != nil {
log.Println("fluentd: reconnect failed")
return err
}
log.Println("fluentd: reconnect successful")
return nil
}

func (adapter *FluentdAdapter) retryTemporary(json []uint8) error {
log.Printf("fluentd: retrying tcp up to %v times\n", retryCount)
err := retryExp(func() error {
_, err := adapter.conn.Write(json)
if err == nil {
log.Println("fluentd: retry successful")
return nil
}

return err
}, retryCount)

if err != nil {
log.Println("fluentd: retry failed")
return err
}

return nil
}

func (adapter *FluentdAdapter) reconnect() error {
log.Printf("fluentd: reconnecting up to %v times\n", retryCount)
err := retryExp(func() error {
conn, err := adapter.transport.Dial(adapter.route.Address, adapter.route.Options)
if err != nil {
return err
}
adapter.conn = conn
return nil
}, retryCount)

if err != nil {
return err
}
return nil
}

func retryExp(fun func() error, tries uint) error {
try := uint(0)
for {
err := fun()
if err == nil {
return nil
}

try++
if try > tries {
return err
}

time.Sleep((1 << try) * 10 * time.Millisecond)
}
}

// NewFluentdAdapter creates a Logspout fluentd adapter instance.
Expand All @@ -62,9 +169,11 @@ func NewFluentdAdapter(route *router.Route) (router.LogAdapter, error) {
return &FluentdAdapter{
conn: conn,
route: route,
transport: transport,
}, nil
}

func init() {
router.AdapterFactories.Register(NewFluentdAdapter, "fluentd-tcp")
setRetryCount()
}