You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

147 lines
3.0 KiB

package logger
import (
"context"
"fmt"
"net"
"strconv"
"github.com/pkg/errors"
"github.com/segmentio/kafka-go"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
type KafkaConfig struct {
// format: "localhost:9092,localhost:9093,localhost:9094"
Addr []string
Topic string
NumPartitions int
ReplicationFactor int
}
func (kc *KafkaConfig) createTopic() error {
conn, err := kafka.Dial("tcp", kc.Addr[0])
if err != nil {
return errors.Wrap(err, "can't connect to Kafka")
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
return errors.Wrap(err, "cannot get controller")
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
return errors.Wrap(err, "can't connect to Kafka controller")
}
defer controllerConn.Close()
replicationFactor := 1
if kc.ReplicationFactor > 1 {
replicationFactor = kc.ReplicationFactor
}
numPartitions := 1
if kc.NumPartitions > 1 {
replicationFactor = kc.NumPartitions
}
topicConfigs := []kafka.TopicConfig{
{
Topic: kc.Topic,
NumPartitions: numPartitions,
ReplicationFactor: replicationFactor,
},
}
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
return errors.Wrap(err, "can't create topics")
}
return nil
}
func (kc *KafkaConfig) writer(errLogger *zap.Logger) *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(kc.Addr...),
Topic: kc.Topic,
Balancer: &kafka.LeastBytes{},
Async: true,
ErrorLogger: &errorLogger{
errLogger,
},
}
}
type writerSyncer struct {
kwr *kafka.Writer
errorLogger *zap.Logger
topic string
}
func (ws *writerSyncer) Write(p []byte) (int, error) {
val := make([]byte, len(p))
copy(val, p)
m := kafka.Message{
Value: val,
}
err := ws.kwr.WriteMessages(context.Background(), m)
if err != nil {
ws.errorLogger.Error("Error writing log", zap.ByteString("log", p), zap.Error(err))
}
return len(p), nil
}
func (ws *writerSyncer) Sync() error {
return ws.kwr.Close()
}
type errorLogger struct {
*zap.Logger
}
func (l *errorLogger) Printf(msg string, args ...interface{}) {
l.Error(fmt.Sprintf(msg, args...))
}
func SetupDefaultKafkaLogger(namespace string, cfg KafkaConfig) error {
if len(cfg.Addr) == 0 {
return errors.New("kafka address must be specified")
}
if err := cfg.createTopic(); err != nil {
return errors.Wrapf(err, "cannot create topic: %s", cfg.Topic)
}
errorLogger := initLogger(WithStringCasting())
ws := &writerSyncer{
kwr: cfg.writer(errorLogger),
topic: cfg.Topic,
errorLogger: errorLogger,
}
enc := newStringCastingEncoder(zap.NewProductionEncoderConfig())
core := zapcore.NewCore(enc, ws, zap.NewAtomicLevelAt(zap.InfoLevel))
errOut, _, err := zap.Open("stderr")
if err != nil {
return err
}
opts := []zap.Option{zap.ErrorOutput(errOut), zap.AddCaller()}
logger := zap.New(core, opts...)
defaultLogger = New(logger.Named(namespace))
return nil
}