You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
342 lines
10 KiB
342 lines
10 KiB
package lib
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type Metrics struct {
|
|
StateHost StateHost
|
|
Connections int // количество соединений за весь период учета
|
|
Queue_AVG float32 // среднее количество запросов в очереди
|
|
Queue_QTL_80 float32 // квантиль 80% - какое среднее кол-во запросов до границы 80% в отсорованном ряду
|
|
Queue_QTL_90 float32 // квантиль 90%
|
|
Queue_QTL_99 float32 // квантиль 99%
|
|
TPR_AVG_MS float32 // (ms) Time per request - среднее время обработки запроса
|
|
TPR_QTL_MS_80 float32 // (ms) квантиль 80% - какое среднее время обработки запросов до границы 80% в отсорованном ряду
|
|
TPR_QTL_MS_90 float32 // (ms) квантиль 90%
|
|
TPR_QTL_MS_99 float32 // (ms) квантиль 99%
|
|
|
|
RPS int // Request per second - количество запросов в секунду
|
|
}
|
|
|
|
type serviceMetric struct {
|
|
Metrics
|
|
Stash Metrics // карман для сохранения предыдущего значения
|
|
connectionOpen int // текущее кол-во открытых соединений (+ при запрос - при ответе)
|
|
queue []int // массив соединений в очереди (не закрытых) см.выше
|
|
tpr []time.Duration // массив времен обработки запросов
|
|
mux *sync.Mutex
|
|
ctx context.Context
|
|
}
|
|
|
|
type ServiceMetric interface {
|
|
SetState()
|
|
SetConnectionIncrement()
|
|
SetConnectionDecrement()
|
|
SetTimeRequest(timeRequest time.Duration)
|
|
Generate()
|
|
Get() (result Metrics)
|
|
Clear()
|
|
SaveToStash()
|
|
Middleware(next http.Handler) http.Handler
|
|
}
|
|
|
|
func (s *serviceMetric) SetState() {
|
|
//s.mux.Lock()
|
|
//defer s.mux.Unlock()
|
|
|
|
s.StateHost.Tick()
|
|
|
|
return
|
|
}
|
|
|
|
// записываем время обработки запроса в массив
|
|
func (s *serviceMetric) SetTimeRequest(timeRequest time.Duration) {
|
|
go func() {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
s.tpr = append(s.tpr, timeRequest)
|
|
}()
|
|
|
|
return
|
|
}
|
|
|
|
// SetConnectionIncrement увеличиваем счетчик и добавляем в массив метрик
|
|
// формируем временной ряд количества соединений
|
|
// при начале запроса увеличиваем, при завершении уменьшаем
|
|
// запускаем в отдельной рутине, потому что ф-ция вызывается из сервиса и не должна быть блокирующей
|
|
func (s *serviceMetric) SetConnectionIncrement() {
|
|
go func() {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
s.Connections = s.Connections + 1
|
|
s.connectionOpen = s.connectionOpen + 1
|
|
s.queue = append(s.queue, s.connectionOpen)
|
|
}()
|
|
|
|
return
|
|
}
|
|
|
|
// SetConnectionDecrement уменьшаем счетчик и добавляем в массив метрик
|
|
// запускаем в отдельной рутине, потому что ф-ция вызывается из сервиса и не должна быть блокирующей
|
|
func (s *serviceMetric) SetConnectionDecrement() {
|
|
go func() {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
if s.connectionOpen != 0 {
|
|
s.connectionOpen = s.connectionOpen - 1
|
|
}
|
|
s.queue = append(s.queue, s.connectionOpen)
|
|
}()
|
|
|
|
return
|
|
}
|
|
|
|
func (s *serviceMetric) SetP(value time.Duration) {
|
|
go func() {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
s.tpr = append(s.tpr, value)
|
|
}()
|
|
|
|
return
|
|
}
|
|
|
|
// SaveToStash сохраняем текущее значение расчитанных метрик в кармане
|
|
func (s *serviceMetric) SaveToStash() {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
s.Stash.StateHost = s.StateHost
|
|
s.Stash.Connections = s.Connections
|
|
s.Stash.RPS = s.RPS
|
|
|
|
s.Stash.Queue_AVG = s.Queue_AVG
|
|
s.Stash.Queue_QTL_99 = s.Queue_QTL_99
|
|
s.Stash.Queue_QTL_90 = s.Queue_QTL_90
|
|
s.Stash.Queue_QTL_80 = s.Queue_QTL_80
|
|
|
|
s.Stash.TPR_AVG_MS = s.TPR_AVG_MS
|
|
s.Stash.TPR_QTL_MS_80 = s.TPR_QTL_MS_80
|
|
s.Stash.TPR_QTL_MS_90 = s.TPR_QTL_MS_90
|
|
s.Stash.TPR_QTL_MS_99 = s.TPR_QTL_MS_99
|
|
|
|
return
|
|
}
|
|
|
|
func (s *serviceMetric) Clear() {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
s.Connections = 0
|
|
s.connectionOpen = 0
|
|
s.queue = []int{}
|
|
s.tpr = []time.Duration{}
|
|
|
|
s.RPS = 0
|
|
s.Queue_AVG = 0.0
|
|
s.Queue_QTL_80 = 0.0
|
|
s.Queue_QTL_90 = 0.0
|
|
s.Queue_QTL_99 = 0.0
|
|
|
|
s.TPR_AVG_MS = 0.0
|
|
s.TPR_QTL_MS_80 = 0.0
|
|
s.TPR_QTL_MS_90 = 0.0
|
|
s.TPR_QTL_MS_99 = 0.0
|
|
|
|
return
|
|
}
|
|
|
|
func (s *serviceMetric) Get() (result Metrics) {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
return s.Stash
|
|
}
|
|
|
|
func (s *serviceMetric) Generate() {
|
|
var val_Queue_QTL_80, val_Queue_QTL_90, val_Queue_QTL_99, val_Queue float32
|
|
var Queue_AVG, Queue_QTL_80, Queue_QTL_90, Queue_QTL_99 float32
|
|
var val_TPR_80, val_TPR_90, val_TPR_99, val_TPR float32
|
|
var AVG_TPR, QTL_TPR_80, QTL_TPR_90, QTL_TPR_99 float32
|
|
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
s.SetState() // БЕЗ БЛОКИРОВКИ получаю текущие метрики загрузки хоста
|
|
|
|
//////////////////////////////////////////////////////////
|
|
// расчитываем среднее кол-во запросо и квартили (средние значения после 80-90-99 процентов всех запросов)
|
|
//////////////////////////////////////////////////////////
|
|
|
|
// сортируем список
|
|
sort.Ints(s.queue)
|
|
|
|
lenQueue := len(s.queue)
|
|
|
|
if lenQueue != 0 {
|
|
len_Queue_QTL_80 := lenQueue * 8 / 10
|
|
len_Queue_QTL_90 := lenQueue * 9 / 10
|
|
len_Queue_QTL_99 := lenQueue * 99 / 100
|
|
|
|
for i, v := range s.queue {
|
|
vall := float32(v)
|
|
// суммируем значения которые после 80% других
|
|
if i > len_Queue_QTL_80 {
|
|
val_Queue_QTL_80 = val_Queue_QTL_80 + vall
|
|
}
|
|
// суммируем значения которые после 90% других
|
|
if i > len_Queue_QTL_90 {
|
|
val_Queue_QTL_90 = val_Queue_QTL_90 + vall
|
|
}
|
|
// суммируем значения которые после 99% других
|
|
if i > len_Queue_QTL_99 {
|
|
val_Queue_QTL_99 = val_Queue_QTL_99 + vall
|
|
}
|
|
|
|
val_Queue = val_Queue + vall
|
|
}
|
|
|
|
lQ := float32(lenQueue) - 1 // проверка на 0
|
|
if lQ == 0 {
|
|
lQ = 1
|
|
}
|
|
Queue_AVG = val_Queue / lQ
|
|
Queue_QTL_80 = val_Queue_QTL_80 / float32(lenQueue-len_Queue_QTL_80)
|
|
Queue_QTL_90 = val_Queue_QTL_90 / float32(lenQueue-len_Queue_QTL_90)
|
|
Queue_QTL_99 = val_Queue_QTL_99 / float32(lenQueue-len_Queue_QTL_99)
|
|
}
|
|
|
|
//////////////////////////////////////////////////////////
|
|
// расчитываем среднее время запросо и квартили (средние значения после 80-90-99 процентов всех запросов)
|
|
//////////////////////////////////////////////////////////
|
|
|
|
// сортируем список
|
|
lenTPR := len(s.tpr)
|
|
if lenTPR != 0 {
|
|
|
|
timeInt := []float64{}
|
|
for _, v := range s.tpr {
|
|
timeInt = append(timeInt, float64(v.Microseconds()))
|
|
}
|
|
sort.Float64s(timeInt)
|
|
|
|
len_TPR_80 := lenTPR * 8 / 10
|
|
len_TPR_90 := lenTPR * 9 / 10
|
|
len_TPR_99 := lenTPR * 99 / 100
|
|
|
|
for i, v := range timeInt {
|
|
vall := float32(v)
|
|
// суммируем значения которые после 80% других
|
|
if i > len_TPR_80 {
|
|
val_TPR_80 = val_TPR_80 + vall
|
|
}
|
|
// суммируем значения которые после 90% других
|
|
if i > len_TPR_90 {
|
|
val_TPR_90 = val_TPR_90 + vall
|
|
}
|
|
// суммируем значения которые после 99% других
|
|
if i > len_TPR_99 {
|
|
val_TPR_99 = val_TPR_99 + vall
|
|
}
|
|
|
|
val_TPR = val_TPR + vall
|
|
}
|
|
|
|
lQ := float32(lenQueue) - 1
|
|
if lQ == 0 {
|
|
lQ = 1
|
|
}
|
|
AVG_TPR = val_TPR / lQ
|
|
QTL_TPR_80 = val_TPR_80 / float32(lenTPR-len_TPR_80)
|
|
QTL_TPR_90 = val_TPR_90 / float32(lenTPR-len_TPR_90)
|
|
QTL_TPR_99 = val_TPR_99 / float32(lenTPR-len_TPR_99)
|
|
}
|
|
|
|
//////////////////////////////////////////////////////////
|
|
//////////////////////////////////////////////////////////
|
|
|
|
s.RPS = lenQueue / 10
|
|
|
|
s.Queue_AVG = Queue_AVG
|
|
s.Queue_QTL_80 = Queue_QTL_80
|
|
s.Queue_QTL_90 = Queue_QTL_90
|
|
s.Queue_QTL_99 = Queue_QTL_99
|
|
|
|
s.TPR_AVG_MS = AVG_TPR
|
|
s.TPR_QTL_MS_80 = QTL_TPR_80
|
|
s.TPR_QTL_MS_90 = QTL_TPR_90
|
|
s.TPR_QTL_MS_99 = QTL_TPR_99
|
|
|
|
return
|
|
}
|
|
|
|
func (s *serviceMetric) Middleware(next http.Handler) http.Handler {
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
// увеличиваем счетчик активных сессий
|
|
s.SetConnectionIncrement()
|
|
next.ServeHTTP(w, r)
|
|
|
|
// уменьшаем счетчик активных сессий
|
|
s.SetConnectionDecrement()
|
|
})
|
|
}
|
|
|
|
// interval - интервалы времени, через которые статистика будет сбрасыватсья в лог
|
|
func NewMetric(ctx context.Context, interval time.Duration) (metrics ServiceMetric) {
|
|
m := sync.Mutex{}
|
|
t := StateHost{}
|
|
s := Metrics{
|
|
StateHost: t,
|
|
Queue_AVG: 0,
|
|
Queue_QTL_99: 0,
|
|
Queue_QTL_90: 0,
|
|
Queue_QTL_80: 0,
|
|
TPR_AVG_MS: 0,
|
|
TPR_QTL_MS_80: 0,
|
|
TPR_QTL_MS_90: 0,
|
|
TPR_QTL_MS_99: 0,
|
|
RPS: 0,
|
|
}
|
|
metrics = &serviceMetric{
|
|
Metrics: s,
|
|
Stash: s,
|
|
connectionOpen: 0,
|
|
queue: []int{},
|
|
mux: &m,
|
|
ctx: ctx,
|
|
}
|
|
|
|
go RunMetricLogger(ctx, metrics, interval)
|
|
|
|
return metrics
|
|
}
|
|
|
|
func RunMetricLogger(ctx context.Context, m ServiceMetric, interval time.Duration) {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
// сохраняем значение метрик в лог
|
|
m.Generate() // сгенерировали метрики
|
|
m.SaveToStash() // сохранили в карман
|
|
m.Clear() // очистили объект метрик для приема новых данных
|
|
//mes, _ := json.Marshal(m.Get())
|
|
//logger.Trace(string(mes)) // записали в лог из кармана
|
|
|
|
ticker = time.NewTicker(interval)
|
|
}
|
|
}
|
|
}
|
|
|