(CORE) CORE (2022)

Asynchronous MultiThreaded SSH engine for Web (Net Core 6, Linux) - Part 7,8 (QuartzService/Jobs and Singleton CacheService). ConcurrentDictionary, Interlocked.Increment, SyncLock, Closure variable for MultiThreading, Cron Configuration.

7. Singleton CacheService.

All SignalR ConnectionID with UserID saved in described component into special Singleton services named NotificationCacheService - in one side. And another side, all notification (with truncated result of SSH command) also stored onto service NotificationCacheService. This is interface INotificationCacheService.



And this is realization, it pretty simple, at common this is only store and delete this two entity into MyConcurrentDictionary.



What is MyConcurrentDictionary? This is ordinary thread safe ConcurrentDictionary with one additional method I used in many places in current component - Asynchronous MultiThreaded SSH engine for Web.



8. QuartzService/JobsQuartzService/Jobs. ConcurrentDictionary, Interlocked.Increment, SyncLock, Closure variable for MultiThreading, Cron Configuration.

So, now we are ready to going to main item of component I describing - Quartz service and 4 jobs needed to working descibed component - - Asynchronous MultiThreaded SSH engine for Web. This is common documentation for Quartz https://www.quartz-scheduler.net/documentation/quartz-3.x/quick-start.html.

Common project templates of Quartz service and Jobs I have uploaded to GitHub https://github.com/ViacheslavUKR/VB-NET-ASP-NET-Core-3.1.Quartz.Service and Visual Studio Marketplace https://marketplace.visualstudio.com/items?itemName=vb-net-com.QuartzProjectTemplate two years ago.

This component contains practical implementation of that common idea. But firstly we need forget what need to configure Quartz service at all.

Firstly, we need to add reference to project and adding service to DI container.



Than we need to configure QuartzHostedService and classes JobFactory and JobSchedule. That's it, even for Triggers I don't applicable in this project.



Also wee need to configure Cron control variable. There a lot of site helps us to create correct variable

Then we need to add correct Cron expression to each Job.



Main point with working with Quartz job that this is MultiThreading code and next job usually started when previous job still not finished yet.



Therefore first point is debugging your algorithm in simple one-thread mode. For this goal I have used this job initialization - start job one time at next minutes.


  55:          services.AddSingleton(Of SshJob)
  56:          Dim CurrentMin As Integer = Now.Minute 
  57:          Dim SshJobShed = New JobSchedule(GetType(SShJob), $"0 {CurrentMin + 1} * ? * * *")
  58:          services.AddSingleton(SshJobShed)

My component has four job now.

Programming technique of MultiThreading module is very special, usually I use technique wit Closure variable, you can see my technique in code below. You can see how I created Queue to Servers and VM and perform my SshEngine.



   1:  Imports System
   2:  Imports System.Net.Http
   3:  Imports System.Text
   4:  Imports System.Threading
   5:  Imports BackendAPI.Helper
   6:  Imports BackendAPI.KVM
   7:  Imports BackendAPI.Vm
   8:  Imports BackendAPI.Model
   9:  Imports BackendAPI.Services
  10:  Imports Microsoft.Extensions.DependencyInjection
  11:  Imports Microsoft.Extensions.Logging
  12:  Imports Newtonsoft.Json
  13:  Imports Quartz
  14:  Imports Quartz.Spi
  15:  Imports System.Collections.Concurrent
  16:  Imports BackendAPI.Notification
  17:   
  18:  Public Class SShJob
  19:      Implements IJob
  20:   
  21:      Private ReadOnly _logger As ILogger(Of SShJob)
  22:      Private ReadOnly _Aes As AesCryptor
  23:      Private ReadOnly _Db As ApplicationDbContext
  24:      Private Counter As Integer
  25:      Public Sub New(ByVal logger As ILogger(Of SShJob), Cryptor As IAesCryptor, DbContext As ApplicationDbContext)
  26:          _logger = logger
  27:          _Aes = Cryptor
  28:          _Db = DbContext
  29:          WorkingServer = New MyConcurrentDictionary(Of Integer, BashJob)
  30:          WorkingVm = New MyConcurrentDictionary(Of Integer, BashJob)
  31:      End Sub
  32:   
  33:      Private WorkingServer As MyConcurrentDictionary(Of Integer, BashJob)
  34:      Private WorkingVm As MyConcurrentDictionary(Of Integer, BashJob)
  35:      Private RequestNextJob As New Object 'mutlithreading code !!!
  36:   
  37:      Public Function Execute(context As IJobExecutionContext) As Task Implements IJob.Execute
  38:          Interlocked.Increment(Counter)
  39:          '_logger.LogInformation($"SshJob Started {Now}, InstanceID {context.FireInstanceId}")
  40:          SyncLock RequestNextJob
  41:              Dim GetNextJobTsk = BashJob.GetNextJob(_Db)
  42:              If GetNextJobTsk.Result.Item2 Is Nothing Then
  43:                  If GetNextJobTsk.Result.Item1.Count > 0 Then
  44:                      Dim NextJob As BashJob = GetNextJobTsk.Result.Item1(0)
  45:                      If NextJob.toVm = 0 Then
  46:                          _logger.LogInformation($"Job for Server {NextJob.toServer} will be processed")
  47:                          Dim Val1 As BashJob
  48:                          Dim ServerWorking As Boolean = WorkingServer.TryGetValue(NextJob.toServer, Val1)
  49:                          If Not ServerWorking Then
  50:                              Dim AddSucess1 = WorkingServer.TryAdd(NextJob.toServer, NextJob)
  51:                              If AddSucess1 Then
  52:                                  Dim ServerThread = New Thread(Sub() ServerJob(NextJob, New ServerClosure))
  53:                                  ServerThread.Start()
  54:                              Else
  55:                                  Exit Function
  56:                              End If
  57:                          Else
  58:                              Exit Function
  59:                          End If
  60:                      ElseIf NextJob.toServer = 0 Then
  61:                          _logger.LogInformation($"Job for Vm {NextJob.toVm} will be processed")
  62:                          Dim Val2 As BashJob
  63:                          Dim VmWorking = WorkingVm.TryGetValue(NextJob.toVm, Val2)
  64:                          If Not VmWorking Then
  65:                              Dim AddSucess1 = WorkingVm.TryAdd(NextJob.toVm, NextJob)
  66:                              If AddSucess1 Then
  67:                                  Dim VmThread = New Thread(Sub() VmJob(NextJob, New VmClosure))
  68:                                  VmThread.Start()
  69:                              Else
  70:                                  Exit Function
  71:                              End If
  72:                          Else
  73:                              Exit Function
  74:                          End If
  75:                      End If
  76:                  Else
  77:                      _logger.LogInformation($"SshJob ({Counter.ToString}) {Now.ToString} Query empty")
  78:                  End If
  79:              Else
  80:                  _logger.LogInformation($"SshJob Query error {GetNextJobTsk.Result.Item2.Message}")
  81:              End If
  82:          End SyncLock
  83:   
  84:      End Function
  85:   
  86:      Async Sub ServerJob(ByVal Prm As BashJob, ByVal Closure As ServerClosure)
  87:          Try
  88:              _logger.LogInformation($"ServerSsh JobStart ({Counter.ToString}) {Now.ToString}, Server:{Prm.toServer}, {Prm.i}:[{Prm.Command}], WorkingServer {WorkingServer.PrintKeys}")
  89:              Closure.MarkStartTsk = Sql.ExecNonQueryAsync(_Db, $"UPDATE `cryptochestmax`.`BashJob` SET `IsStarted`=1 WHERE `i`={Prm.i}")
  90:              Await Closure.MarkStartTsk
  91:              Closure.Server = New ServerBashAsync(_Db, _Aes, Prm.toServer, Prm.SshDecryptPass)
  92:              Closure.Connect = Closure.Server.SSHServerConnect()
  93:              If Closure.Connect.Item1 IsNot Nothing Then
  94:                  Closure.ServerBashRetTsk = Closure.Server.Bash(Prm.Command)
  95:                  Await Closure.ServerBashRetTsk
  96:   
  97:                  Closure.Server.SshErrMsg.ForEach(Sub(X) Closure.ClearErr.Add(X.Replace("'", "^").Replace("`", "~")))
  98:                  Closure.Server.SshOutput.ForEach(Sub(X) Closure.ClearOutput.Add(X.Replace("'", "^").Replace("`", "~")))
  99:                  Closure.IsCompletedTsk = Sql.ExecNonQueryAsync(_Db, $"UPDATE `cryptochestmax`.`BashJob` SET `IsCompleted`=1," &
 100:                                        $"`Result`='{Left(String.Join(Environment.NewLine, Closure.ClearOutput), 65000)}'," &
 101:                                        $"`Error`='{Left(Closure.ServerBashRetTsk.Result & vbCrLf & String.Join(Environment.NewLine, Closure.ClearErr), 65000)}'," &
 102:                                        $"`LastUpdate`=Now() WHERE `i`={Prm.i}")
 103:                  Await Closure.IsCompletedTsk
 104:              Else
 105:                  Closure.IsCompletedWithErrTsk = Sql.ExecNonQueryAsync(_Db, $"UPDATE `cryptochestmax`.`BashJob` SET `IsCompleted`=1,`Error`='Not connected'," &
 106:                                                                        $"`LastUpdate`=Now() WHERE `i`={Prm.i}")
 107:                  Await Closure.IsCompletedWithErrTsk
 108:              End If
 109:              WorkingServer.TryRemove(Prm.toServer, Prm)
 110:              _logger.LogInformation($"ServerSsh JobFinish ({Counter.ToString}) {Now.ToString}, Server:{Prm.toServer}, {Prm.i}:[{Prm.Command}], WorkingServer { WorkingServer.PrintKeys}")
 111:          Catch ex As Exception
 112:              _logger.LogInformation($"ServerSsh ({Counter.ToString}) {Now.ToString} Server:{Prm.toServer} {Prm.i}:[{Prm.Command}] GettingError {ex.Message}")
 113:          Finally
 114:              WorkingServer.TryRemove(Prm.toServer, Prm)
 115:          End Try
 116:      End Sub
 117:   
 118:      Async Sub VmJob(ByVal Prm As BashJob, ByVal Closure As VmClosure)
 119:          Try
 120:              Closure.GetVmNameTsk = _Db.RawSqlQueryAsync(Of String)($"Select `Name` FROM cryptochestmax.`VM` WHERE `i`={Prm.toVm};", Function(X) X("Name"))
 121:              Await Closure.GetVmNameTsk
 122:              If Closure.GetVmNameTsk.Result.Item2 Is Nothing Then
 123:                  _logger.LogInformation($"VmSsh JobStart ({Counter.ToString}) {Now.ToString}, Vm{Prm.toVm}, {Prm.i}:[{Prm.Command}], WorkingVm {WorkingVm.PrintKeys}")
 124:                  Closure.MarkStartTsk = Sql.ExecNonQueryAsync(_Db, $"UPDATE `cryptochestmax`.`BashJob` SET `IsStarted`=1 WHERE `i`={Prm.i}")
 125:                  Await Closure.MarkStartTsk
 126:                  Closure.Vm = New VmBashAsync(_Db, _Aes, Closure.GetVmNameTsk.Result.Item1(0), Prm.SshDecryptPass)
 127:                  Closure.Connect = Closure.Vm.SSHServerConnect()
 128:                  If Closure.Connect.Item1 IsNot Nothing Then
 129:                      Closure.VmBashRetTsk = Closure.Vm.Bash(Prm.Command)
 130:                      Await Closure.VmBashRetTsk
 131:   
 132:                      Closure.Vm.SshErrMsg.ForEach(Sub(X) Closure.ClearErr.Add(X.Replace("'", "^").Replace("`", "~")))
 133:                      Closure.Vm.SshOutput.ForEach(Sub(X) Closure.ClearOutput.Add(X.Replace("'", "^").Replace("`", "~")))
 134:                      Closure.IsCompletedTsk = Sql.ExecNonQueryAsync(_Db, $"UPDATE `cryptochestmax`.`BashJob` SET `IsCompleted`=1," &
 135:                                                                     $"`Result`='{Left(String.Join(Environment.NewLine, Closure.ClearOutput), 65000)}'," &
 136:                                                                     $"`Error`='{Left(Closure.VmBashRetTsk.Result & vbCrLf & String.Join(Environment.NewLine, Closure.ClearErr), 65000)}'," &
 137:                                                                     $"`LastUpdate`=Now() WHERE `i`={Prm.i}")
 138:                      Await Closure.IsCompletedTsk
 139:                  Else
 140:                      Closure.IsCompletedWithErrTsk = Sql.ExecNonQueryAsync(_Db, $"UPDATE `cryptochestmax`.`BashJob` SET `IsCompleted`=1,`Error`='Not connected'," &
 141:                                                                            $"`LastUpdate`=Now() WHERE `i`={Prm.i}")
 142:                      Await Closure.IsCompletedWithErrTsk
 143:                  End If
 144:              Else
 145:                  _logger.LogInformation($"VmSsh ({Counter.ToString}) {Now.ToString} Vm {Prm.toVm} is nothing")
 146:              End If
 147:              WorkingVm.TryRemove(Prm.toVm, Prm)
 148:              _logger.LogInformation($"VmSsh JobFinish ({Counter.ToString}) {Now.ToString}, Vm:{Prm.toVm}, {Prm.i}:[{Prm.Command}], WorkingVm {WorkingVm.PrintKeys}")
 149:          Catch ex As Exception
 150:              _logger.LogInformation($"VmSsh ({Counter.ToString}) {Now.ToString} Vm:{Prm.toVm} {Prm.i}:[{Prm.Command}] GettingError {ex.Message}")
 151:          Finally
 152:              WorkingVm.TryRemove(Prm.toVm, Prm)
 153:          End Try
 154:      End Sub
 155:  End Class

Other job is simpler.


   1:  Imports System
   2:  Imports System.Net.Http
   3:  Imports System.Text
   4:  Imports System.Threading
   5:  Imports BackendAPI.Model
   6:  Imports BackendAPI.Notification
   7:  Imports BackendAPI.Services
   8:  Imports Microsoft.Extensions.Configuration
   9:  Imports Microsoft.Extensions.DependencyInjection
  10:  Imports Microsoft.Extensions.Logging
  11:  Imports Newtonsoft.Json
  12:  Imports Quartz
  13:  Imports Quartz.Spi
  14:  Imports BackendAPI
  15:  Imports System.Net
  16:  Imports BackendAPI.KVM
  17:  Imports BackendAPI.Helper
  18:   
  19:  Public Class NotificationCacheState
  20:      Implements IJob
  21:   
  22:      Private ReadOnly NotificationCacheStateUrl As String
  23:      Private ReadOnly _logger As ILogger(Of SShJob)
  24:      Private ReadOnly _Db As ApplicationDbContext
  25:      Private Counter As Integer
  26:      Private NextRequest As New Object 'mutlithreading code !!!
  27:      Private ReadOnly Request As MyWebClient
  28:      Private ReadOnly NotificationTokenLogin As String
  29:      Private ReadOnly NotificationTokenPass As String
  30:      Public Sub New(ByVal logger As ILogger(Of SShJob), DbContext As ApplicationDbContext, Configuration As IConfiguration)
  31:          _logger = logger
  32:          _Db = DbContext
  33:          Request = New MyWebClient
  34:          Request.BaseAddress = Configuration.GetValue(Of String)("Backend:BaseURL")
  35:          Request.Headers.Add("Content-Type", "application/json")
  36:          NotificationCacheStateUrl = Configuration.GetValue(Of String)("Backend:NotificationCacheState")
  37:          NotificationTokenLogin = Configuration.GetValue(Of String)("NotificationToken:Login")
  38:          NotificationTokenPass = Configuration.GetValue(Of String)("NotificationToken:Password")
  39:   
  40:      End Sub
  41:      Public Function Execute(context As IJobExecutionContext) As Task Implements IJob.Execute
  42:          Interlocked.Increment(Counter)
  43:          SyncLock NextRequest
  44:              Request.Headers.Add("Content-Type", "application/json")
  45:              Dim Token = GetNotificationToken(Request, NotificationTokenLogin, NotificationTokenPass)
  46:              Request.Headers.Clear()
  47:              Request.Headers.Add("Authorization", "Bearer: " & Token)
  48:              Request.Headers.Add("Content-Type", "application/json")
  49:              Try
  50:                  Dim Response = Request.DownloadString(NotificationCacheStateUrl)
  51:                  If Response <> "{""notificationKey"":""No"",""signalRConnectionKeys"":""No""}" Then
  52:                      _logger.LogInformation($"NotificationCacheState ({Counter.ToString}) {Now.ToString},  {Response}")
  53:                  End If
  54:              Catch ex As WebException
  55:                  Dim Resp As String = ""
  56:                  Dim Stream = ex.Response?.GetResponseStream()
  57:                  If Stream IsNot Nothing Then
  58:                      Dim Sr = New IO.StreamReader(Stream)
  59:                      Resp = Sr.ReadToEnd
  60:                  End If
  61:                  _logger.LogInformation($"NotificationCacheState Error ({Counter.ToString}) {Now.ToString}, BackendHub Response: {Resp & vbCrLf & ex.Message}")
  62:              End Try
  63:          End SyncLock
  64:      End Function
  65:  End Class

   1:  Imports System
   2:  Imports System.Net.Http
   3:  Imports System.Text
   4:  Imports System.Threading
   5:  Imports BackendAPI.Model
   6:  Imports BackendAPI.Notification
   7:  Imports BackendAPI.Services
   8:  Imports Microsoft.Extensions.Configuration
   9:  Imports Microsoft.Extensions.DependencyInjection
  10:  Imports Microsoft.Extensions.Logging
  11:  Imports Newtonsoft.Json
  12:  Imports Quartz
  13:  Imports Quartz.Spi
  14:  Imports BackendAPI
  15:  Imports System.Net
  16:  Imports BackendAPI.Kvm
  17:  Imports BackendAPI.Helper
  18:   
  19:  Public Class SendTestNotificationToClient
  20:      Implements IJob
  21:   
  22:      Private ReadOnly _logger As ILogger(Of SendTestNotificationToClient)
  23:      Private Counter As Integer
  24:      Private ReadOnly _TestUrl As String
  25:      Private ReadOnly NotificationTokenLogin As String
  26:      Private ReadOnly NotificationTokenPass As String
  27:      Private ReadOnly Request As MyWebClient
  28:      Private RequestNextJob As New Object 'mutlithreading code !!!
  29:   
  30:      Public Sub New(ByVal logger As ILogger(Of SendTestNotificationToClient), Cryptor As IAesCryptor, DbContext As ApplicationDbContext, Configuration As IConfiguration)
  31:          _logger = logger
  32:          _TestUrl = Configuration.GetValue(Of String)("Backend:TestNotification")
  33:          NotificationTokenLogin = Configuration.GetValue(Of String)("NotificationToken:Login")
  34:          NotificationTokenPass = Configuration.GetValue(Of String)("NotificationToken:Password")
  35:          Request = New MyWebClient
  36:          Request.BaseAddress = Configuration.GetValue(Of String)("Backend:BaseURL")
  37:          Request.Headers.Add("Content-Type", "application/json")
  38:      End Sub
  39:   
  40:      Public Function Execute(context As IJobExecutionContext) As Task Implements IJob.Execute
  41:          Interlocked.Increment(Counter)
  42:          SyncLock RequestNextJob
  43:              Request.Headers.Add("Content-Type", "application/json")
  44:              Dim Token = GetNotificationToken(Request, NotificationTokenLogin, NotificationTokenPass)
  45:              Request.Headers.Clear()
  46:              Request.Headers.Add("Authorization", "Bearer: " & Token)
  47:              Request.Headers.Add("Content-Type", "application/json")
  48:              Try
  49:                  Dim Response = Request.DownloadString(_TestUrl)
  50:                  _logger.LogInformation(Response)
  51:                  If Response.GetType.Name = "OkObjectResult" Then
  52:                      _logger.LogInformation($"SendTestNotificationToClient Job Finish ({Counter.ToString}), InstanceID {context.FireInstanceId},  {Now.ToString} ")
  53:                  ElseIf Response.GetType.Name = "ObjectResult" Then
  54:                      _logger.LogInformation($"SendTestNotificationToClient Job Notification Error ({Counter.ToString}), InstanceID {context.FireInstanceId}, {Now.ToString}")
  55:                  End If
  56:              Catch ex As WebException
  57:                  Dim Resp As String = ""
  58:                  Dim Stream = ex.Response?.GetResponseStream()
  59:                  If Stream IsNot Nothing Then
  60:                      Dim Sr = New IO.StreamReader(Stream)
  61:                      Resp = Sr.ReadToEnd
  62:                  End If
  63:                  _logger.LogInformation($"BackendHubNotification Job Error ({Counter.ToString}), InstanceID {context.FireInstanceId}, {Now.ToString}, BackendHub Response: {Resp & vbCrLf & ex.Message}")
  64:              End Try
  65:          End SyncLock
  66:      End Function
  67:  End Class

   1:  Imports System
   2:  Imports System.Net.Http
   3:  Imports System.Text
   4:  Imports System.Threading
   5:  Imports BackendAPI.Model
   6:  Imports BackendAPI.Notification
   7:  Imports BackendAPI.Services
   8:  Imports Microsoft.Extensions.Configuration
   9:  Imports Microsoft.Extensions.DependencyInjection
  10:  Imports Microsoft.Extensions.Logging
  11:  Imports Newtonsoft.Json
  12:  Imports Quartz
  13:  Imports Quartz.Spi
  14:  Imports BackendAPI
  15:  Imports System.Net
  16:  Imports BackendAPI.KVM
  17:  Imports BackendAPI.Helper
  18:   
  19:  'https://stackoverflow.com/questions/9343594/how-to-call-asynchronous-method-from-synchronous-method-in-c
  20:  Public Class BackendHubNotificationJob
  21:      Implements IJob
  22:   
  23:      Private ReadOnly _NotificationUrl As String
  24:      Private ReadOnly _logger As ILogger(Of BackendHubNotificationJob)
  25:      Private ReadOnly _Aes As AesCryptor
  26:      Private ReadOnly _Db As ApplicationDbContext
  27:      Private Counter As Integer
  28:      Private ReadOnly Request As MyWebClient
  29:      Private RequestNextJob As New Object 'mutlithreading code !!!
  30:      Private ReadOnly NotificationTokenLogin As String
  31:      Private ReadOnly NotificationTokenPass As String
  32:      Public Sub New(ByVal logger As ILogger(Of BackendHubNotificationJob), Configuration As IConfiguration, Cryptor As IAesCryptor, DbContext As ApplicationDbContext)
  33:          _logger = logger
  34:          _NotificationUrl = Configuration.GetValue(Of String)("Backend:NotificationUrl")
  35:          NotificationTokenLogin = Configuration.GetValue(Of String)("NotificationToken:Login")
  36:          NotificationTokenPass = Configuration.GetValue(Of String)("NotificationToken:Password")
  37:          _Aes = Cryptor
  38:          _Db = DbContext
  39:          Request = New MyWebClient
  40:          Request.BaseAddress = Configuration.GetValue(Of String)("Backend:BaseURL")
  41:          Request.Headers.Add("Content-Type", "application/json")
  42:      End Sub
  43:   
  44:      'mutlithreading code !!!
  45:      Public Function Execute(context As IJobExecutionContext) As Task Implements IJob.Execute
  46:          Interlocked.Increment(Counter)
  47:          SyncLock RequestNextJob
  48:              '_logger.LogInformation($"BackendHubNotification JobStarted ({Counter.ToString}), InstanceID {context.FireInstanceId},  {Now.ToString} ")
  49:              Dim GetNextJobTsk = BashJob.GetOneNextNotifiedJob(_Db, "`IsCompleted`=1 and `IsBackendHubNotified`=0 and `IsSubscriberError` is NULL and `IsSubscriberNotified` is NULL and not (`SubscribeId` is NULL)")
  50:              If GetNextJobTsk.Result.Item2 Is Nothing Then
  51:                  If GetNextJobTsk.Result.Item1.Count > 0 Then
  52:                      Dim NextJob As BashJobFinishedRequest = GetNextJobTsk.Result.Item1(0)
  53:                      _logger.LogInformation($"BackendHubNotification ({Counter.ToString}) found notification {NextJob.i}.")
  54:                      Request.Headers.Add("Content-Type", "application/json")
  55:                      Dim Token = GetNotificationToken(Request, NotificationTokenLogin, NotificationTokenPass)
  56:                      Request.Headers.Clear()
  57:                      Request.Headers.Add("Authorization", "Bearer: " & Token)
  58:                      Request.Headers.Add("Content-Type", "application/json")
  59:                      Dim PostPrm = New BashJobFinishedRequest With {
  60:                                  .i = NextJob.i,
  61:                                  .CrDate = NextJob.CrDate,
  62:                                  .toServer = NextJob.toServer,
  63:                                  .toVm = NextJob.toVm,
  64:                                  .toUser = NextJob.toUser,
  65:                                  .SubscribeId = NextJob.SubscribeId,
  66:                                  .Command = NextJob.Command,
  67:                                  .Comment = NextJob.Comment,
  68:                                  .LastUpdate = NextJob.LastUpdate}
  69:                      Dim PostData = JsonConvert.SerializeObject(PostPrm)
  70:                      Try
  71:                          Dim Response = Encoding.UTF8.GetString(Request.UploadData(_NotificationUrl, Encoding.UTF8.GetBytes(PostData)))
  72:                          _logger.LogInformation(Response)
  73:                          If Response.StartsWith("{""error"":"""",") Then '{"error":"","notificationKey":"No","signalRConnectionKeys":"No"}
  74:   
  75:                              'Dim BackendHubNotifiedTsk As Task(Of Integer) = Task.Run(Async Function()
  76:                              '                                                             Dim BackendHubNotified = Await Sql.ExecNonQueryAsync(_Db,
  77:                              '                                                                                        $"UPDATE `cryptochestmax`.`BashJob` SET `IsBackendHubNotified`= 1, " &
  78:                              '                                                                                        $"`LastUpdate`=Now() WHERE `i`={NextJob.i}")
  79:                              '                                                             Return BackendHubNotified
  80:                              '                                                         End Function)
  81:                              '_logger.LogInformation($"BackendHubNotification Job Finish ({Counter.ToString}) {Now.ToString} {BackendHubNotifiedTsk.Result} row updated")
  82:                              _logger.LogInformation($"BackendHubNotification Job Finish ({Counter.ToString}) {Now.ToString}, Server:{NextJob.toServer}, Vm:{NextJob.toVm}, User:{NextJob.toVm}, {NextJob.i}:[{NextJob.Command}]")
  83:                          Else
  84:                              _logger.LogInformation($"BackendHubNotification Job Finish With Error ({Counter.ToString}) {Now.ToString}, Server:{NextJob.toServer}, Vm:{NextJob.toVm}, User:{NextJob.toVm}, {NextJob.i}:[{NextJob.Command}], BackendHub Response: {Response}")
  85:                          End If
  86:                      Catch ex As WebException
  87:                          Dim Resp As String = ""
  88:                          Dim Stream = ex.Response?.GetResponseStream()
  89:                          If Stream IsNot Nothing Then
  90:                              Dim Sr = New IO.StreamReader(Stream)
  91:                              Resp = Sr.ReadToEnd
  92:                          End If
  93:                          _logger.LogInformation($"BackendHubNotification Job Error ({Counter.ToString}) {Now.ToString}, Server:{NextJob.toServer}, Vm:{NextJob.toVm}, User:{NextJob.toVm}, {NextJob.i}:[{NextJob.Command}], BackendHub Response: {Resp & vbCrLf & ex.Message}")
  94:                      End Try
  95:                  Else
  96:                      _logger.LogInformation($"BackendHubNotification Job ({Counter.ToString}) {Now.ToString} Query empty")
  97:                  End If
  98:              Else
  99:                  _logger.LogInformation($"BackendHubNotification Job Query Error {GetNextJobTsk.Result.Item2.Message}")
 100:              End If
 101:              Return Task.CompletedTask
 102:          End SyncLock
 103:      End Function
 104:   
 105:   
 106:  End Class
 107:  Imports System
 108:  Imports System.Net.Http
 109:  Imports System.Text
 110:  Imports System.Threading
 111:  Imports BackendAPI.Model
 112:  Imports BackendAPI.Notification
 113:  Imports BackendAPI.Services
 114:  Imports Microsoft.Extensions.Configuration
 115:  Imports Microsoft.Extensions.DependencyInjection
 116:  Imports Microsoft.Extensions.Logging
 117:  Imports Newtonsoft.Json
 118:  Imports Quartz
 119:  Imports Quartz.Spi
 120:  Imports BackendAPI
 121:  Imports System.Net
 122:  Imports BackendAPI.KVM
 123:  Imports BackendAPI.Helper
 124:   
 125:  Public Class BackendHubNotificationJob
 126:      Implements IJob
 127:   
 128:      Private ReadOnly _NotificationUrl As String
 129:      Private ReadOnly _logger As ILogger(Of BackendHubNotificationJob)
 130:      Private ReadOnly _Aes As AesCryptor
 131:      Private ReadOnly _Db As ApplicationDbContext
 132:      Private Counter As Integer
 133:      Private ReadOnly Request As MyWebClient
 134:      Private RequestNextJob As New Object 'mutlithreading code !!!
 135:      Private ReadOnly NotificationTokenLogin As String
 136:      Private ReadOnly NotificationTokenPass As String
 137:      Public Sub New(ByVal logger As ILogger(Of BackendHubNotificationJob), Configuration As IConfiguration, Cryptor As IAesCryptor, DbContext As ApplicationDbContext)
 138:          _logger = logger
 139:          _NotificationUrl = Configuration.GetValue(Of String)("Backend:NotificationUrl")
 140:          NotificationTokenLogin = Configuration.GetValue(Of String)("NotificationToken:Login")
 141:          NotificationTokenPass = Configuration.GetValue(Of String)("NotificationToken:Password")
 142:          _Aes = Cryptor
 143:          _Db = DbContext
 144:          Request = New MyWebClient
 145:          Request.BaseAddress = Configuration.GetValue(Of String)("Backend:BaseURL")
 146:          Request.Headers.Add("Content-Type", "application/json")
 147:      End Sub
 148:   
 149:      'mutlithreading code !!!
 150:      Public Function Execute(context As IJobExecutionContext) As Task Implements IJob.Execute
 151:          Interlocked.Increment(Counter)
 152:          SyncLock RequestNextJob
 153:              '_logger.LogInformation($"BackendHubNotification JobStarted ({Counter.ToString}), InstanceID {context.FireInstanceId},  {Now.ToString} ")
 154:              Dim GetNextJobTsk = BashJob.GetOneNextNotifiedJob(_Db, "`IsCompleted`=1 and `IsBackendHubNotified`=0 and `IsSubscriberError` is NULL and `IsSubscriberNotified` is NULL and not (`SubscribeId` is NULL)")
 155:              If GetNextJobTsk.Result.Item2 Is Nothing Then
 156:                  If GetNextJobTsk.Result.Item1.Count > 0 Then
 157:                      Dim NextJob As BashJobFinishedRequest = GetNextJobTsk.Result.Item1(0)
 158:                      _logger.LogInformation($"BackendHubNotification ({Counter.ToString}) found notification {NextJob.i}.")
 159:                      Request.Headers.Add("Content-Type", "application/json")
 160:                      Dim Token = GetNotificationToken(Request, NotificationTokenLogin, NotificationTokenPass)
 161:                      Request.Headers.Clear()
 162:                      Request.Headers.Add("Authorization", "Bearer: " & Token)
 163:                      Request.Headers.Add("Content-Type", "application/json")
 164:                      Dim PostPrm = New BashJobFinishedRequest With {
 165:                                  .i = NextJob.i,
 166:                                  .CrDate = NextJob.CrDate,
 167:                                  .toServer = NextJob.toServer,
 168:                                  .toVm = NextJob.toVm,
 169:                                  .toUser = NextJob.toUser,
 170:                                  .SubscribeId = NextJob.SubscribeId,
 171:                                  .Command = NextJob.Command,
 172:                                  .Comment = NextJob.Comment,
 173:                                  .LastUpdate = NextJob.LastUpdate}
 174:                      Dim PostData = JsonConvert.SerializeObject(PostPrm)
 175:                      Try
 176:                          Dim Response = Encoding.UTF8.GetString(Request.UploadData(_NotificationUrl, Encoding.UTF8.GetBytes(PostData)))
 177:                          _logger.LogInformation(Response)
 178:                          If Response.StartsWith("{""error"":"""",") Then '{"error":"","notificationKey":"No","signalRConnectionKeys":"No"}
 179:                              '_logger.LogInformation($"BackendHubNotification Job Finish ({Counter.ToString}) {Now.ToString} {BackendHubNotifiedTsk.Result} row updated")
 180:                              _logger.LogInformation($"BackendHubNotification Job Finish ({Counter.ToString}) {Now.ToString}, Server:{NextJob.toServer}, Vm:{NextJob.toVm}, User:{NextJob.toVm}, {NextJob.i}:[{NextJob.Command}]")
 181:                          Else
 182:                              _logger.LogInformation($"BackendHubNotification Job Finish With Error ({Counter.ToString}) {Now.ToString}, Server:{NextJob.toServer}, Vm:{NextJob.toVm}, User:{NextJob.toVm}, {NextJob.i}:[{NextJob.Command}], BackendHub Response: {Response}")
 183:                          End If
 184:                      Catch ex As WebException
 185:                          Dim Resp As String = ""
 186:                          Dim Stream = ex.Response?.GetResponseStream()
 187:                          If Stream IsNot Nothing Then
 188:                              Dim Sr = New IO.StreamReader(Stream)
 189:                              Resp = Sr.ReadToEnd
 190:                          End If
 191:                          _logger.LogInformation($"BackendHubNotification Job Error ({Counter.ToString}) {Now.ToString}, Server:{NextJob.toServer}, Vm:{NextJob.toVm}, User:{NextJob.toVm}, {NextJob.i}:[{NextJob.Command}], BackendHub Response: {Resp & vbCrLf & ex.Message}")
 192:                      End Try
 193:                  Else
 194:                      _logger.LogInformation($"BackendHubNotification Job ({Counter.ToString}) {Now.ToString} Query empty")
 195:                  End If
 196:              Else
 197:                  _logger.LogInformation($"BackendHubNotification Job Query Error {GetNextJobTsk.Result.Item2.Message}")
 198:              End If
 199:              Return Task.CompletedTask
 200:          End SyncLock
 201:      End Function
 202:   
 203:   
 204:  End Class

As a result of this 4 job we have PING to each SignalR client, we can see in log what stored our NotificationCacheService. And, of course, we can execute SSH commands one-by-one and (if Clients still alive) we notify BackendEndpoint about finished job.




Comments ( )
Link to this page: http://www.vb-net.com/SshQueueNotificationServer/Index3.htm
< THANKS ME>