@@ -3,6 +3,7 @@ package main
33import (
44 "context"
55 "encoding/json"
6+ "errors"
67 "fmt"
78 "math"
89 "strings"
@@ -25,6 +26,31 @@ type Service struct {
2526 kubeManager KubernetesManager
2627 settings * ScaleSettings
2728 currentRunnerCount int
29+ metricsExporter metricsExporter
30+ errs []error
31+ }
32+
33+ func WithPrometheusMetrics (conf RunnerScaleSetListenerConfig ) func (* Service ) {
34+ return func (svc * Service ) {
35+ parsedURL , err := actions .ParseGitHubConfigFromURL (conf .ConfigureUrl )
36+ if err != nil {
37+ svc .errs = append (svc .errs , err )
38+ }
39+
40+ svc .metricsExporter .withBaseLabels (baseLabels {
41+ scaleSetName : conf .EphemeralRunnerSetName ,
42+ scaleSetNamespace : conf .EphemeralRunnerSetNamespace ,
43+ enterprise : parsedURL .Enterprise ,
44+ organization : parsedURL .Organization ,
45+ repository : parsedURL .Repository ,
46+ })
47+ }
48+ }
49+
50+ func WithLogger (logger logr.Logger ) func (* Service ) {
51+ return func (s * Service ) {
52+ s .logger = logger .WithName ("service" )
53+ }
2854}
2955
3056func NewService (
@@ -33,7 +59,7 @@ func NewService(
3359 manager KubernetesManager ,
3460 settings * ScaleSettings ,
3561 options ... func (* Service ),
36- ) * Service {
62+ ) ( * Service , error ) {
3763 s := & Service {
3864 ctx : ctx ,
3965 rsClient : rsClient ,
@@ -47,7 +73,11 @@ func NewService(
4773 option (s )
4874 }
4975
50- return s
76+ if len (s .errs ) > 0 {
77+ return nil , errors .Join (s .errs ... )
78+ }
79+
80+ return s , nil
5181}
5282
5383func (s * Service ) Start () error {
@@ -81,6 +111,8 @@ func (s *Service) processMessage(message *actions.RunnerScaleSetMessage) error {
81111 "busy runners" , message .Statistics .TotalBusyRunners ,
82112 "idle runners" , message .Statistics .TotalIdleRunners )
83113
114+ s .metricsExporter .publishStatistics (message .Statistics )
115+
84116 if message .MessageType != "RunnerScaleSetJobMessages" {
85117 s .logger .Info ("skip message with unknown message type." , "messageType" , message .MessageType )
86118 return nil
@@ -110,27 +142,54 @@ func (s *Service) processMessage(message *actions.RunnerScaleSetMessage) error {
110142 if err := json .Unmarshal (message , & jobAvailable ); err != nil {
111143 return fmt .Errorf ("could not decode job available message. %w" , err )
112144 }
113- s .logger .Info ("job available message received." , "RequestId" , jobAvailable .RunnerRequestId )
145+ s .logger .Info (
146+ "job available message received." ,
147+ "RequestId" ,
148+ jobAvailable .RunnerRequestId ,
149+ )
114150 availableJobs = append (availableJobs , jobAvailable .RunnerRequestId )
115151 case "JobAssigned" :
116152 var jobAssigned actions.JobAssigned
117153 if err := json .Unmarshal (message , & jobAssigned ); err != nil {
118154 return fmt .Errorf ("could not decode job assigned message. %w" , err )
119155 }
120- s .logger .Info ("job assigned message received." , "RequestId" , jobAssigned .RunnerRequestId )
156+ s .logger .Info (
157+ "job assigned message received." ,
158+ "RequestId" ,
159+ jobAssigned .RunnerRequestId ,
160+ )
161+ // s.metricsExporter.publishJobAssigned(&jobAssigned)
121162 case "JobStarted" :
122163 var jobStarted actions.JobStarted
123164 if err := json .Unmarshal (message , & jobStarted ); err != nil {
124165 return fmt .Errorf ("could not decode job started message. %w" , err )
125166 }
126- s .logger .Info ("job started message received." , "RequestId" , jobStarted .RunnerRequestId , "RunnerId" , jobStarted .RunnerId )
167+ s .logger .Info (
168+ "job started message received." ,
169+ "RequestId" ,
170+ jobStarted .RunnerRequestId ,
171+ "RunnerId" ,
172+ jobStarted .RunnerId ,
173+ )
174+ s .metricsExporter .publishJobStarted (& jobStarted )
127175 s .updateJobInfoForRunner (jobStarted )
128176 case "JobCompleted" :
129177 var jobCompleted actions.JobCompleted
130178 if err := json .Unmarshal (message , & jobCompleted ); err != nil {
131179 return fmt .Errorf ("could not decode job completed message. %w" , err )
132180 }
133- s .logger .Info ("job completed message received." , "RequestId" , jobCompleted .RunnerRequestId , "Result" , jobCompleted .Result , "RunnerId" , jobCompleted .RunnerId , "RunnerName" , jobCompleted .RunnerName )
181+ s .logger .Info (
182+ "job completed message received." ,
183+ "RequestId" ,
184+ jobCompleted .RunnerRequestId ,
185+ "Result" ,
186+ jobCompleted .Result ,
187+ "RunnerId" ,
188+ jobCompleted .RunnerId ,
189+ "RunnerName" ,
190+ jobCompleted .RunnerName ,
191+ )
192+ s .metricsExporter .publishJobCompleted (& jobCompleted )
134193 default :
135194 s .logger .Info ("unknown job message type." , "messageType" , messageType .MessageType )
136195 }
@@ -146,13 +205,15 @@ func (s *Service) processMessage(message *actions.RunnerScaleSetMessage) error {
146205
147206func (s * Service ) scaleForAssignedJobCount (count int ) error {
148207 targetRunnerCount := int (math .Max (math .Min (float64 (s .settings .MaxRunners ), float64 (count )), float64 (s .settings .MinRunners )))
208+ s .metricsExporter .publishDesiredRunners (targetRunnerCount )
149209 if targetRunnerCount != s .currentRunnerCount {
150210 s .logger .Info ("try scale runner request up/down base on assigned job count" ,
151211 "assigned job" , count ,
152212 "decision" , targetRunnerCount ,
153213 "min" , s .settings .MinRunners ,
154214 "max" , s .settings .MaxRunners ,
155- "currentRunnerCount" , s .currentRunnerCount )
215+ "currentRunnerCount" , s .currentRunnerCount ,
216+ )
156217 err := s .kubeManager .ScaleEphemeralRunnerSet (s .ctx , s .settings .Namespace , s .settings .ResourceName , targetRunnerCount )
157218 if err != nil {
158219 return fmt .Errorf ("could not scale ephemeral runner set (%s/%s). %w" , s .settings .Namespace , s .settings .ResourceName , err )
@@ -173,7 +234,8 @@ func (s *Service) updateJobInfoForRunner(jobInfo actions.JobStarted) {
173234 "workflowRef" , jobInfo .JobWorkflowRef ,
174235 "workflowRunId" , jobInfo .WorkflowRunId ,
175236 "jobDisplayName" , jobInfo .JobDisplayName ,
176- "requestId" , jobInfo .RunnerRequestId )
237+ "requestId" , jobInfo .RunnerRequestId ,
238+ )
177239 err := s .kubeManager .UpdateEphemeralRunnerWithJobInfo (s .ctx , s .settings .Namespace , jobInfo .RunnerName , jobInfo .OwnerName , jobInfo .RepositoryName , jobInfo .JobWorkflowRef , jobInfo .JobDisplayName , jobInfo .WorkflowRunId , jobInfo .RunnerRequestId )
178240 if err != nil {
179241 s .logger .Error (err , "could not update ephemeral runner with job info" , "runnerName" , jobInfo .RunnerName , "requestId" , jobInfo .RunnerRequestId )
0 commit comments