Asynchronous MultiThreaded SSH engine for Web (Net Core 6, Linux) - Part 7,8 (QuartzService/Jobs and Singleton CacheService). ConcurrentDictionary, Interlocked.Increment, SyncLock, Closure variable for MultiThreading, Cron Configuration.
7. Singleton CacheService.
All SignalR ConnectionID with UserID saved in described component into special Singleton services named NotificationCacheService - in one side. And another side, all notification (with truncated result of SSH command) also stored onto service NotificationCacheService. This is interface INotificationCacheService.
And this is realization, it pretty simple, at common this is only store and delete this two entity into MyConcurrentDictionary.
What is MyConcurrentDictionary? This is ordinary thread safe ConcurrentDictionary with one additional method I used in many places in current component - Asynchronous MultiThreaded SSH engine for Web.
8. QuartzService/JobsQuartzService/Jobs. ConcurrentDictionary, Interlocked.Increment, SyncLock, Closure variable for MultiThreading, Cron Configuration.
So, now we are ready to going to main item of component I describing - Quartz service and 4 jobs needed to working descibed component - - Asynchronous MultiThreaded SSH engine for Web. This is common documentation for Quartz https://www.quartz-scheduler.net/documentation/quartz-3.x/quick-start.html.
Common project templates of Quartz service and Jobs I have uploaded to GitHub https://github.com/ViacheslavUKR/VB-NET-ASP-NET-Core-3.1.Quartz.Service and Visual Studio Marketplace https://marketplace.visualstudio.com/items?itemName=vb-net-com.QuartzProjectTemplate two years ago.
This component contains practical implementation of that common idea. But firstly we need forget what need to configure Quartz service at all.
Firstly, we need to add reference to project and adding service to DI container.
Than we need to configure QuartzHostedService and classes JobFactory and JobSchedule. That's it, even for Triggers I don't applicable in this project.
Also wee need to configure Cron control variable. There a lot of site helps us to create correct variable
- http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html
- https://www.freeformatter.com/cron-expression-generator-quartz.html
- https://crontab.guru/
Then we need to add correct Cron expression to each Job.
Main point with working with Quartz job that this is MultiThreading code and next job usually started when previous job still not finished yet.
Therefore first point is debugging your algorithm in simple one-thread mode. For this goal I have used this job initialization - start job one time at next minutes.
55: services.AddSingleton(Of SshJob)
56: Dim CurrentMin As Integer = Now.Minute
57: Dim SshJobShed = New JobSchedule(GetType(SShJob), $"0 {CurrentMin + 1} * ? * * *")
58: services.AddSingleton(SshJobShed)
My component has four job now.
- SShJob - this job ordered Queue for each Servers and Vm and than call SSH Engine I described in this page - Asynchronous MultiThreaded SSH engine for Web (Net Core 6, Linux) - Part 1,2 (Database and SSH client). StreamReader/ReadToEndAsync, Extension/Inherits/Overloads, BeginExecute/EndExecute/IAsyncResult/IProgress(Of T)/Async/Await/CancellationToken/Task.Run/Thread.Yield.. Finished of this job is Result and Error fields in Database.
- NotificationCacheState - this Job can visually checking in log what SignalR client still alive and what Notification is ready to send to Clients.
- BackendHubNotificationJob - this Job call Backend API I described in last part of this engine Asynchronous MultiThreaded SSH engine for Web (Net Core 6, Linux) - Part 9,10 (BackendAPI for NotificationController and my technique to testing this engine with xUnit)., and send to Baskend endpoint result of SSH command (truncated without result, because result string can be very long).
- SendTestNotificationToClient - this Job is similar to Ping services to checking state of SignalR client and checking - is SignalR clients sill alive?
Programming technique of MultiThreading module is very special, usually I use technique wit Closure variable, you can see my technique in code below. You can see how I created Queue to Servers and VM and perform my SshEngine.
1: Imports System
2: Imports System.Net.Http
3: Imports System.Text
4: Imports System.Threading
5: Imports BackendAPI.Helper
6: Imports BackendAPI.KVM
7: Imports BackendAPI.Vm
8: Imports BackendAPI.Model
9: Imports BackendAPI.Services
10: Imports Microsoft.Extensions.DependencyInjection
11: Imports Microsoft.Extensions.Logging
12: Imports Newtonsoft.Json
13: Imports Quartz
14: Imports Quartz.Spi
15: Imports System.Collections.Concurrent
16: Imports BackendAPI.Notification
17:
18: Public Class SShJob
19: Implements IJob
20:
21: Private ReadOnly _logger As ILogger(Of SShJob)
22: Private ReadOnly _Aes As AesCryptor
23: Private ReadOnly _Db As ApplicationDbContext
24: Private Counter As Integer
25: Public Sub New(ByVal logger As ILogger(Of SShJob), Cryptor As IAesCryptor, DbContext As ApplicationDbContext)
26: _logger = logger
27: _Aes = Cryptor
28: _Db = DbContext
29: WorkingServer = New MyConcurrentDictionary(Of Integer, BashJob)
30: WorkingVm = New MyConcurrentDictionary(Of Integer, BashJob)
31: End Sub
32:
33: Private WorkingServer As MyConcurrentDictionary(Of Integer, BashJob)
34: Private WorkingVm As MyConcurrentDictionary(Of Integer, BashJob)
35: Private RequestNextJob As New Object 'mutlithreading code !!!
36:
37: Public Function Execute(context As IJobExecutionContext) As Task Implements IJob.Execute
38: Interlocked.Increment(Counter)
39: '_logger.LogInformation($"SshJob Started {Now}, InstanceID {context.FireInstanceId}")
40: SyncLock RequestNextJob
41: Dim GetNextJobTsk = BashJob.GetNextJob(_Db)
42: If GetNextJobTsk.Result.Item2 Is Nothing Then
43: If GetNextJobTsk.Result.Item1.Count > 0 Then
44: Dim NextJob As BashJob = GetNextJobTsk.Result.Item1(0)
45: If NextJob.toVm = 0 Then
46: _logger.LogInformation($"Job for Server {NextJob.toServer} will be processed")
47: Dim Val1 As BashJob
48: Dim ServerWorking As Boolean = WorkingServer.TryGetValue(NextJob.toServer, Val1)
49: If Not ServerWorking Then
50: Dim AddSucess1 = WorkingServer.TryAdd(NextJob.toServer, NextJob)
51: If AddSucess1 Then
52: Dim ServerThread = New Thread(Sub() ServerJob(NextJob, New ServerClosure))
53: ServerThread.Start()
54: Else
55: Exit Function
56: End If
57: Else
58: Exit Function
59: End If
60: ElseIf NextJob.toServer = 0 Then
61: _logger.LogInformation($"Job for Vm {NextJob.toVm} will be processed")
62: Dim Val2 As BashJob
63: Dim VmWorking = WorkingVm.TryGetValue(NextJob.toVm, Val2)
64: If Not VmWorking Then
65: Dim AddSucess1 = WorkingVm.TryAdd(NextJob.toVm, NextJob)
66: If AddSucess1 Then
67: Dim VmThread = New Thread(Sub() VmJob(NextJob, New VmClosure))
68: VmThread.Start()
69: Else
70: Exit Function
71: End If
72: Else
73: Exit Function
74: End If
75: End If
76: Else
77: _logger.LogInformation($"SshJob ({Counter.ToString}) {Now.ToString} Query empty")
78: End If
79: Else
80: _logger.LogInformation($"SshJob Query error {GetNextJobTsk.Result.Item2.Message}")
81: End If
82: End SyncLock
83:
84: End Function
85:
86: Async Sub ServerJob(ByVal Prm As BashJob, ByVal Closure As ServerClosure)
87: Try
88: _logger.LogInformation($"ServerSsh JobStart ({Counter.ToString}) {Now.ToString}, Server:{Prm.toServer}, {Prm.i}:[{Prm.Command}], WorkingServer {WorkingServer.PrintKeys}")
89: Closure.MarkStartTsk = Sql.ExecNonQueryAsync(_Db, $"UPDATE `cryptochestmax`.`BashJob` SET `IsStarted`=1 WHERE `i`={Prm.i}")
90: Await Closure.MarkStartTsk
91: Closure.Server = New ServerBashAsync(_Db, _Aes, Prm.toServer, Prm.SshDecryptPass)
92: Closure.Connect = Closure.Server.SSHServerConnect()
93: If Closure.Connect.Item1 IsNot Nothing Then
94: Closure.ServerBashRetTsk = Closure.Server.Bash(Prm.Command)
95: Await Closure.ServerBashRetTsk
96:
97: Closure.Server.SshErrMsg.ForEach(Sub(X) Closure.ClearErr.Add(X.Replace("'", "^").Replace("`", "~")))
98: Closure.Server.SshOutput.ForEach(Sub(X) Closure.ClearOutput.Add(X.Replace("'", "^").Replace("`", "~")))
99: Closure.IsCompletedTsk = Sql.ExecNonQueryAsync(_Db, $"UPDATE `cryptochestmax`.`BashJob` SET `IsCompleted`=1," &
100: $"`Result`='{Left(String.Join(Environment.NewLine, Closure.ClearOutput), 65000)}'," &
101: $"`Error`='{Left(Closure.ServerBashRetTsk.Result & vbCrLf & String.Join(Environment.NewLine, Closure.ClearErr), 65000)}'," &
102: $"`LastUpdate`=Now() WHERE `i`={Prm.i}")
103: Await Closure.IsCompletedTsk
104: Else
105: Closure.IsCompletedWithErrTsk = Sql.ExecNonQueryAsync(_Db, $"UPDATE `cryptochestmax`.`BashJob` SET `IsCompleted`=1,`Error`='Not connected'," &
106: $"`LastUpdate`=Now() WHERE `i`={Prm.i}")
107: Await Closure.IsCompletedWithErrTsk
108: End If
109: WorkingServer.TryRemove(Prm.toServer, Prm)
110: _logger.LogInformation($"ServerSsh JobFinish ({Counter.ToString}) {Now.ToString}, Server:{Prm.toServer}, {Prm.i}:[{Prm.Command}], WorkingServer { WorkingServer.PrintKeys}")
111: Catch ex As Exception
112: _logger.LogInformation($"ServerSsh ({Counter.ToString}) {Now.ToString} Server:{Prm.toServer} {Prm.i}:[{Prm.Command}] GettingError {ex.Message}")
113: Finally
114: WorkingServer.TryRemove(Prm.toServer, Prm)
115: End Try
116: End Sub
117:
118: Async Sub VmJob(ByVal Prm As BashJob, ByVal Closure As VmClosure)
119: Try
120: Closure.GetVmNameTsk = _Db.RawSqlQueryAsync(Of String)($"Select `Name` FROM cryptochestmax.`VM` WHERE `i`={Prm.toVm};", Function(X) X("Name"))
121: Await Closure.GetVmNameTsk
122: If Closure.GetVmNameTsk.Result.Item2 Is Nothing Then
123: _logger.LogInformation($"VmSsh JobStart ({Counter.ToString}) {Now.ToString}, Vm{Prm.toVm}, {Prm.i}:[{Prm.Command}], WorkingVm {WorkingVm.PrintKeys}")
124: Closure.MarkStartTsk = Sql.ExecNonQueryAsync(_Db, $"UPDATE `cryptochestmax`.`BashJob` SET `IsStarted`=1 WHERE `i`={Prm.i}")
125: Await Closure.MarkStartTsk
126: Closure.Vm = New VmBashAsync(_Db, _Aes, Closure.GetVmNameTsk.Result.Item1(0), Prm.SshDecryptPass)
127: Closure.Connect = Closure.Vm.SSHServerConnect()
128: If Closure.Connect.Item1 IsNot Nothing Then
129: Closure.VmBashRetTsk = Closure.Vm.Bash(Prm.Command)
130: Await Closure.VmBashRetTsk
131:
132: Closure.Vm.SshErrMsg.ForEach(Sub(X) Closure.ClearErr.Add(X.Replace("'", "^").Replace("`", "~")))
133: Closure.Vm.SshOutput.ForEach(Sub(X) Closure.ClearOutput.Add(X.Replace("'", "^").Replace("`", "~")))
134: Closure.IsCompletedTsk = Sql.ExecNonQueryAsync(_Db, $"UPDATE `cryptochestmax`.`BashJob` SET `IsCompleted`=1," &
135: $"`Result`='{Left(String.Join(Environment.NewLine, Closure.ClearOutput), 65000)}'," &
136: $"`Error`='{Left(Closure.VmBashRetTsk.Result & vbCrLf & String.Join(Environment.NewLine, Closure.ClearErr), 65000)}'," &
137: $"`LastUpdate`=Now() WHERE `i`={Prm.i}")
138: Await Closure.IsCompletedTsk
139: Else
140: Closure.IsCompletedWithErrTsk = Sql.ExecNonQueryAsync(_Db, $"UPDATE `cryptochestmax`.`BashJob` SET `IsCompleted`=1,`Error`='Not connected'," &
141: $"`LastUpdate`=Now() WHERE `i`={Prm.i}")
142: Await Closure.IsCompletedWithErrTsk
143: End If
144: Else
145: _logger.LogInformation($"VmSsh ({Counter.ToString}) {Now.ToString} Vm {Prm.toVm} is nothing")
146: End If
147: WorkingVm.TryRemove(Prm.toVm, Prm)
148: _logger.LogInformation($"VmSsh JobFinish ({Counter.ToString}) {Now.ToString}, Vm:{Prm.toVm}, {Prm.i}:[{Prm.Command}], WorkingVm {WorkingVm.PrintKeys}")
149: Catch ex As Exception
150: _logger.LogInformation($"VmSsh ({Counter.ToString}) {Now.ToString} Vm:{Prm.toVm} {Prm.i}:[{Prm.Command}] GettingError {ex.Message}")
151: Finally
152: WorkingVm.TryRemove(Prm.toVm, Prm)
153: End Try
154: End Sub
155: End Class
Other job is simpler.
1: Imports System
2: Imports System.Net.Http
3: Imports System.Text
4: Imports System.Threading
5: Imports BackendAPI.Model
6: Imports BackendAPI.Notification
7: Imports BackendAPI.Services
8: Imports Microsoft.Extensions.Configuration
9: Imports Microsoft.Extensions.DependencyInjection
10: Imports Microsoft.Extensions.Logging
11: Imports Newtonsoft.Json
12: Imports Quartz
13: Imports Quartz.Spi
14: Imports BackendAPI
15: Imports System.Net
16: Imports BackendAPI.KVM
17: Imports BackendAPI.Helper
18:
19: Public Class NotificationCacheState
20: Implements IJob
21:
22: Private ReadOnly NotificationCacheStateUrl As String
23: Private ReadOnly _logger As ILogger(Of SShJob)
24: Private ReadOnly _Db As ApplicationDbContext
25: Private Counter As Integer
26: Private NextRequest As New Object 'mutlithreading code !!!
27: Private ReadOnly Request As MyWebClient
28: Private ReadOnly NotificationTokenLogin As String
29: Private ReadOnly NotificationTokenPass As String
30: Public Sub New(ByVal logger As ILogger(Of SShJob), DbContext As ApplicationDbContext, Configuration As IConfiguration)
31: _logger = logger
32: _Db = DbContext
33: Request = New MyWebClient
34: Request.BaseAddress = Configuration.GetValue(Of String)("Backend:BaseURL")
35: Request.Headers.Add("Content-Type", "application/json")
36: NotificationCacheStateUrl = Configuration.GetValue(Of String)("Backend:NotificationCacheState")
37: NotificationTokenLogin = Configuration.GetValue(Of String)("NotificationToken:Login")
38: NotificationTokenPass = Configuration.GetValue(Of String)("NotificationToken:Password")
39:
40: End Sub
41: Public Function Execute(context As IJobExecutionContext) As Task Implements IJob.Execute
42: Interlocked.Increment(Counter)
43: SyncLock NextRequest
44: Request.Headers.Add("Content-Type", "application/json")
45: Dim Token = GetNotificationToken(Request, NotificationTokenLogin, NotificationTokenPass)
46: Request.Headers.Clear()
47: Request.Headers.Add("Authorization", "Bearer: " & Token)
48: Request.Headers.Add("Content-Type", "application/json")
49: Try
50: Dim Response = Request.DownloadString(NotificationCacheStateUrl)
51: If Response <> "{""notificationKey"":""No"",""signalRConnectionKeys"":""No""}" Then
52: _logger.LogInformation($"NotificationCacheState ({Counter.ToString}) {Now.ToString}, {Response}")
53: End If
54: Catch ex As WebException
55: Dim Resp As String = ""
56: Dim Stream = ex.Response?.GetResponseStream()
57: If Stream IsNot Nothing Then
58: Dim Sr = New IO.StreamReader(Stream)
59: Resp = Sr.ReadToEnd
60: End If
61: _logger.LogInformation($"NotificationCacheState Error ({Counter.ToString}) {Now.ToString}, BackendHub Response: {Resp & vbCrLf & ex.Message}")
62: End Try
63: End SyncLock
64: End Function
65: End Class
1: Imports System
2: Imports System.Net.Http
3: Imports System.Text
4: Imports System.Threading
5: Imports BackendAPI.Model
6: Imports BackendAPI.Notification
7: Imports BackendAPI.Services
8: Imports Microsoft.Extensions.Configuration
9: Imports Microsoft.Extensions.DependencyInjection
10: Imports Microsoft.Extensions.Logging
11: Imports Newtonsoft.Json
12: Imports Quartz
13: Imports Quartz.Spi
14: Imports BackendAPI
15: Imports System.Net
16: Imports BackendAPI.Kvm
17: Imports BackendAPI.Helper
18:
19: Public Class SendTestNotificationToClient
20: Implements IJob
21:
22: Private ReadOnly _logger As ILogger(Of SendTestNotificationToClient)
23: Private Counter As Integer
24: Private ReadOnly _TestUrl As String
25: Private ReadOnly NotificationTokenLogin As String
26: Private ReadOnly NotificationTokenPass As String
27: Private ReadOnly Request As MyWebClient
28: Private RequestNextJob As New Object 'mutlithreading code !!!
29:
30: Public Sub New(ByVal logger As ILogger(Of SendTestNotificationToClient), Cryptor As IAesCryptor, DbContext As ApplicationDbContext, Configuration As IConfiguration)
31: _logger = logger
32: _TestUrl = Configuration.GetValue(Of String)("Backend:TestNotification")
33: NotificationTokenLogin = Configuration.GetValue(Of String)("NotificationToken:Login")
34: NotificationTokenPass = Configuration.GetValue(Of String)("NotificationToken:Password")
35: Request = New MyWebClient
36: Request.BaseAddress = Configuration.GetValue(Of String)("Backend:BaseURL")
37: Request.Headers.Add("Content-Type", "application/json")
38: End Sub
39:
40: Public Function Execute(context As IJobExecutionContext) As Task Implements IJob.Execute
41: Interlocked.Increment(Counter)
42: SyncLock RequestNextJob
43: Request.Headers.Add("Content-Type", "application/json")
44: Dim Token = GetNotificationToken(Request, NotificationTokenLogin, NotificationTokenPass)
45: Request.Headers.Clear()
46: Request.Headers.Add("Authorization", "Bearer: " & Token)
47: Request.Headers.Add("Content-Type", "application/json")
48: Try
49: Dim Response = Request.DownloadString(_TestUrl)
50: _logger.LogInformation(Response)
51: If Response.GetType.Name = "OkObjectResult" Then
52: _logger.LogInformation($"SendTestNotificationToClient Job Finish ({Counter.ToString}), InstanceID {context.FireInstanceId}, {Now.ToString} ")
53: ElseIf Response.GetType.Name = "ObjectResult" Then
54: _logger.LogInformation($"SendTestNotificationToClient Job Notification Error ({Counter.ToString}), InstanceID {context.FireInstanceId}, {Now.ToString}")
55: End If
56: Catch ex As WebException
57: Dim Resp As String = ""
58: Dim Stream = ex.Response?.GetResponseStream()
59: If Stream IsNot Nothing Then
60: Dim Sr = New IO.StreamReader(Stream)
61: Resp = Sr.ReadToEnd
62: End If
63: _logger.LogInformation($"BackendHubNotification Job Error ({Counter.ToString}), InstanceID {context.FireInstanceId}, {Now.ToString}, BackendHub Response: {Resp & vbCrLf & ex.Message}")
64: End Try
65: End SyncLock
66: End Function
67: End Class
1: Imports System
2: Imports System.Net.Http
3: Imports System.Text
4: Imports System.Threading
5: Imports BackendAPI.Model
6: Imports BackendAPI.Notification
7: Imports BackendAPI.Services
8: Imports Microsoft.Extensions.Configuration
9: Imports Microsoft.Extensions.DependencyInjection
10: Imports Microsoft.Extensions.Logging
11: Imports Newtonsoft.Json
12: Imports Quartz
13: Imports Quartz.Spi
14: Imports BackendAPI
15: Imports System.Net
16: Imports BackendAPI.KVM
17: Imports BackendAPI.Helper
18:
19: 'https://stackoverflow.com/questions/9343594/how-to-call-asynchronous-method-from-synchronous-method-in-c
20: Public Class BackendHubNotificationJob
21: Implements IJob
22:
23: Private ReadOnly _NotificationUrl As String
24: Private ReadOnly _logger As ILogger(Of BackendHubNotificationJob)
25: Private ReadOnly _Aes As AesCryptor
26: Private ReadOnly _Db As ApplicationDbContext
27: Private Counter As Integer
28: Private ReadOnly Request As MyWebClient
29: Private RequestNextJob As New Object 'mutlithreading code !!!
30: Private ReadOnly NotificationTokenLogin As String
31: Private ReadOnly NotificationTokenPass As String
32: Public Sub New(ByVal logger As ILogger(Of BackendHubNotificationJob), Configuration As IConfiguration, Cryptor As IAesCryptor, DbContext As ApplicationDbContext)
33: _logger = logger
34: _NotificationUrl = Configuration.GetValue(Of String)("Backend:NotificationUrl")
35: NotificationTokenLogin = Configuration.GetValue(Of String)("NotificationToken:Login")
36: NotificationTokenPass = Configuration.GetValue(Of String)("NotificationToken:Password")
37: _Aes = Cryptor
38: _Db = DbContext
39: Request = New MyWebClient
40: Request.BaseAddress = Configuration.GetValue(Of String)("Backend:BaseURL")
41: Request.Headers.Add("Content-Type", "application/json")
42: End Sub
43:
44: 'mutlithreading code !!!
45: Public Function Execute(context As IJobExecutionContext) As Task Implements IJob.Execute
46: Interlocked.Increment(Counter)
47: SyncLock RequestNextJob
48: '_logger.LogInformation($"BackendHubNotification JobStarted ({Counter.ToString}), InstanceID {context.FireInstanceId}, {Now.ToString} ")
49: Dim GetNextJobTsk = BashJob.GetOneNextNotifiedJob(_Db, "`IsCompleted`=1 and `IsBackendHubNotified`=0 and `IsSubscriberError` is NULL and `IsSubscriberNotified` is NULL and not (`SubscribeId` is NULL)")
50: If GetNextJobTsk.Result.Item2 Is Nothing Then
51: If GetNextJobTsk.Result.Item1.Count > 0 Then
52: Dim NextJob As BashJobFinishedRequest = GetNextJobTsk.Result.Item1(0)
53: _logger.LogInformation($"BackendHubNotification ({Counter.ToString}) found notification {NextJob.i}.")
54: Request.Headers.Add("Content-Type", "application/json")
55: Dim Token = GetNotificationToken(Request, NotificationTokenLogin, NotificationTokenPass)
56: Request.Headers.Clear()
57: Request.Headers.Add("Authorization", "Bearer: " & Token)
58: Request.Headers.Add("Content-Type", "application/json")
59: Dim PostPrm = New BashJobFinishedRequest With {
60: .i = NextJob.i,
61: .CrDate = NextJob.CrDate,
62: .toServer = NextJob.toServer,
63: .toVm = NextJob.toVm,
64: .toUser = NextJob.toUser,
65: .SubscribeId = NextJob.SubscribeId,
66: .Command = NextJob.Command,
67: .Comment = NextJob.Comment,
68: .LastUpdate = NextJob.LastUpdate}
69: Dim PostData = JsonConvert.SerializeObject(PostPrm)
70: Try
71: Dim Response = Encoding.UTF8.GetString(Request.UploadData(_NotificationUrl, Encoding.UTF8.GetBytes(PostData)))
72: _logger.LogInformation(Response)
73: If Response.StartsWith("{""error"":"""",") Then '{"error":"","notificationKey":"No","signalRConnectionKeys":"No"}
74:
75: 'Dim BackendHubNotifiedTsk As Task(Of Integer) = Task.Run(Async Function()
76: ' Dim BackendHubNotified = Await Sql.ExecNonQueryAsync(_Db,
77: ' $"UPDATE `cryptochestmax`.`BashJob` SET `IsBackendHubNotified`= 1, " &
78: ' $"`LastUpdate`=Now() WHERE `i`={NextJob.i}")
79: ' Return BackendHubNotified
80: ' End Function)
81: '_logger.LogInformation($"BackendHubNotification Job Finish ({Counter.ToString}) {Now.ToString} {BackendHubNotifiedTsk.Result} row updated")
82: _logger.LogInformation($"BackendHubNotification Job Finish ({Counter.ToString}) {Now.ToString}, Server:{NextJob.toServer}, Vm:{NextJob.toVm}, User:{NextJob.toVm}, {NextJob.i}:[{NextJob.Command}]")
83: Else
84: _logger.LogInformation($"BackendHubNotification Job Finish With Error ({Counter.ToString}) {Now.ToString}, Server:{NextJob.toServer}, Vm:{NextJob.toVm}, User:{NextJob.toVm}, {NextJob.i}:[{NextJob.Command}], BackendHub Response: {Response}")
85: End If
86: Catch ex As WebException
87: Dim Resp As String = ""
88: Dim Stream = ex.Response?.GetResponseStream()
89: If Stream IsNot Nothing Then
90: Dim Sr = New IO.StreamReader(Stream)
91: Resp = Sr.ReadToEnd
92: End If
93: _logger.LogInformation($"BackendHubNotification Job Error ({Counter.ToString}) {Now.ToString}, Server:{NextJob.toServer}, Vm:{NextJob.toVm}, User:{NextJob.toVm}, {NextJob.i}:[{NextJob.Command}], BackendHub Response: {Resp & vbCrLf & ex.Message}")
94: End Try
95: Else
96: _logger.LogInformation($"BackendHubNotification Job ({Counter.ToString}) {Now.ToString} Query empty")
97: End If
98: Else
99: _logger.LogInformation($"BackendHubNotification Job Query Error {GetNextJobTsk.Result.Item2.Message}")
100: End If
101: Return Task.CompletedTask
102: End SyncLock
103: End Function
104:
105:
106: End Class
107: Imports System
108: Imports System.Net.Http
109: Imports System.Text
110: Imports System.Threading
111: Imports BackendAPI.Model
112: Imports BackendAPI.Notification
113: Imports BackendAPI.Services
114: Imports Microsoft.Extensions.Configuration
115: Imports Microsoft.Extensions.DependencyInjection
116: Imports Microsoft.Extensions.Logging
117: Imports Newtonsoft.Json
118: Imports Quartz
119: Imports Quartz.Spi
120: Imports BackendAPI
121: Imports System.Net
122: Imports BackendAPI.KVM
123: Imports BackendAPI.Helper
124:
125: Public Class BackendHubNotificationJob
126: Implements IJob
127:
128: Private ReadOnly _NotificationUrl As String
129: Private ReadOnly _logger As ILogger(Of BackendHubNotificationJob)
130: Private ReadOnly _Aes As AesCryptor
131: Private ReadOnly _Db As ApplicationDbContext
132: Private Counter As Integer
133: Private ReadOnly Request As MyWebClient
134: Private RequestNextJob As New Object 'mutlithreading code !!!
135: Private ReadOnly NotificationTokenLogin As String
136: Private ReadOnly NotificationTokenPass As String
137: Public Sub New(ByVal logger As ILogger(Of BackendHubNotificationJob), Configuration As IConfiguration, Cryptor As IAesCryptor, DbContext As ApplicationDbContext)
138: _logger = logger
139: _NotificationUrl = Configuration.GetValue(Of String)("Backend:NotificationUrl")
140: NotificationTokenLogin = Configuration.GetValue(Of String)("NotificationToken:Login")
141: NotificationTokenPass = Configuration.GetValue(Of String)("NotificationToken:Password")
142: _Aes = Cryptor
143: _Db = DbContext
144: Request = New MyWebClient
145: Request.BaseAddress = Configuration.GetValue(Of String)("Backend:BaseURL")
146: Request.Headers.Add("Content-Type", "application/json")
147: End Sub
148:
149: 'mutlithreading code !!!
150: Public Function Execute(context As IJobExecutionContext) As Task Implements IJob.Execute
151: Interlocked.Increment(Counter)
152: SyncLock RequestNextJob
153: '_logger.LogInformation($"BackendHubNotification JobStarted ({Counter.ToString}), InstanceID {context.FireInstanceId}, {Now.ToString} ")
154: Dim GetNextJobTsk = BashJob.GetOneNextNotifiedJob(_Db, "`IsCompleted`=1 and `IsBackendHubNotified`=0 and `IsSubscriberError` is NULL and `IsSubscriberNotified` is NULL and not (`SubscribeId` is NULL)")
155: If GetNextJobTsk.Result.Item2 Is Nothing Then
156: If GetNextJobTsk.Result.Item1.Count > 0 Then
157: Dim NextJob As BashJobFinishedRequest = GetNextJobTsk.Result.Item1(0)
158: _logger.LogInformation($"BackendHubNotification ({Counter.ToString}) found notification {NextJob.i}.")
159: Request.Headers.Add("Content-Type", "application/json")
160: Dim Token = GetNotificationToken(Request, NotificationTokenLogin, NotificationTokenPass)
161: Request.Headers.Clear()
162: Request.Headers.Add("Authorization", "Bearer: " & Token)
163: Request.Headers.Add("Content-Type", "application/json")
164: Dim PostPrm = New BashJobFinishedRequest With {
165: .i = NextJob.i,
166: .CrDate = NextJob.CrDate,
167: .toServer = NextJob.toServer,
168: .toVm = NextJob.toVm,
169: .toUser = NextJob.toUser,
170: .SubscribeId = NextJob.SubscribeId,
171: .Command = NextJob.Command,
172: .Comment = NextJob.Comment,
173: .LastUpdate = NextJob.LastUpdate}
174: Dim PostData = JsonConvert.SerializeObject(PostPrm)
175: Try
176: Dim Response = Encoding.UTF8.GetString(Request.UploadData(_NotificationUrl, Encoding.UTF8.GetBytes(PostData)))
177: _logger.LogInformation(Response)
178: If Response.StartsWith("{""error"":"""",") Then '{"error":"","notificationKey":"No","signalRConnectionKeys":"No"}
179: '_logger.LogInformation($"BackendHubNotification Job Finish ({Counter.ToString}) {Now.ToString} {BackendHubNotifiedTsk.Result} row updated")
180: _logger.LogInformation($"BackendHubNotification Job Finish ({Counter.ToString}) {Now.ToString}, Server:{NextJob.toServer}, Vm:{NextJob.toVm}, User:{NextJob.toVm}, {NextJob.i}:[{NextJob.Command}]")
181: Else
182: _logger.LogInformation($"BackendHubNotification Job Finish With Error ({Counter.ToString}) {Now.ToString}, Server:{NextJob.toServer}, Vm:{NextJob.toVm}, User:{NextJob.toVm}, {NextJob.i}:[{NextJob.Command}], BackendHub Response: {Response}")
183: End If
184: Catch ex As WebException
185: Dim Resp As String = ""
186: Dim Stream = ex.Response?.GetResponseStream()
187: If Stream IsNot Nothing Then
188: Dim Sr = New IO.StreamReader(Stream)
189: Resp = Sr.ReadToEnd
190: End If
191: _logger.LogInformation($"BackendHubNotification Job Error ({Counter.ToString}) {Now.ToString}, Server:{NextJob.toServer}, Vm:{NextJob.toVm}, User:{NextJob.toVm}, {NextJob.i}:[{NextJob.Command}], BackendHub Response: {Resp & vbCrLf & ex.Message}")
192: End Try
193: Else
194: _logger.LogInformation($"BackendHubNotification Job ({Counter.ToString}) {Now.ToString} Query empty")
195: End If
196: Else
197: _logger.LogInformation($"BackendHubNotification Job Query Error {GetNextJobTsk.Result.Item2.Message}")
198: End If
199: Return Task.CompletedTask
200: End SyncLock
201: End Function
202:
203:
204: End Class
As a result of this 4 job we have PING to each SignalR client, we can see in log what stored our NotificationCacheService. And, of course, we can execute SSH commands one-by-one and (if Clients still alive) we notify BackendEndpoint about finished job.
|