Skip to content

Commit

Permalink
Merge branch 'master' into cassandra-branch
Browse files Browse the repository at this point in the history
  • Loading branch information
keremvaris authored Jun 30, 2024
2 parents ed64f64 + b7c3ce3 commit 57626b7
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 95 deletions.
Binary file added .DS_Store
Binary file not shown.
3 changes: 2 additions & 1 deletion Business/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
using System.Reflection;
using System.Security.Claims;
using System.Security.Principal;
using Core.Utilities.MessageBrokers;
using Core.Utilities.TaskScheduler;
using Core.Utilities.TaskScheduler.Hangfire;
using Core.Utilities.TaskScheduler.Hangfire.Models;
Expand Down Expand Up @@ -78,7 +79,7 @@ ClaimsPrincipal GetPrincipal(IServiceProvider sp) =>
services.AddTransient<ITokenHelper, JwtHelper>();
services.AddTransient<IElasticSearch, ElasticSearchManager>();

services.AddTransient<IMessageBrokerHelper, MqQueueHelper>();
services.AddTransient<IMessageBrokerHelper, RMqQueueHelper>();
services.AddTransient<IMessageConsumer, MqConsumerHelper>();

var taskSchedulerConfig = Configuration.GetSection("TaskSchedulerOptions").Get<TaskSchedulerConfig>();
Expand Down
2 changes: 2 additions & 0 deletions Core/Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<PackageReference Include="Autofac.Extras.DynamicProxy" Version="6.0.1" />
<PackageReference Include="AutoMapper" Version="12.0.1" />
<PackageReference Include="CassandraCSharpDriver" Version="3.20.1" />
<PackageReference Include="Confluent.Kafka" Version="2.4.0" />
<PackageReference Include="FluentValidation" Version="11.4.0" />
<PackageReference Include="Hangfire" Version="1.8.11" />
<PackageReference Include="Hangfire.Autofac" Version="2.6.0" />
Expand Down Expand Up @@ -42,6 +43,7 @@
<PackageReference Include="RabbitMQ.Client" Version="6.4.0" />
<PackageReference Include="Refit" Version="6.3.2" />
<PackageReference Include="Serilog" Version="2.12.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.1"/>
<PackageReference Include="Serilog.Formatting.Elasticsearch" Version="9.0.0" />
<PackageReference Include="Serilog.Sinks.Http" Version="8.0.0" />
<PackageReference Include="Serilog.Sinks.MicrosoftTeams" Version="0.2.1" />
Expand Down
14 changes: 14 additions & 0 deletions Core/CrossCuttingConcerns/Logging/Serilog/Loggers/ConsoleLogger.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using Serilog;

namespace Core.CrossCuttingConcerns.Logging.Serilog.Loggers;

public class ConsoleLogger : LoggerServiceBase
{
public ConsoleLogger()
{
var seriLogConfig = new LoggerConfiguration()
.WriteTo.Console(outputTemplate: "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}")
.CreateLogger();
Logger = seriLogConfig;
}
}
9 changes: 9 additions & 0 deletions Core/Utilities/MessageBrokers/IMessageBrokerHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System.Threading.Tasks;
using Core.Utilities.Results;

namespace Core.Utilities.MessageBrokers;

public interface IMessageBrokerHelper
{
Task<IResult> QueueMessageAsync<T>(T messageModel);
}
6 changes: 6 additions & 0 deletions Core/Utilities/MessageBrokers/IMessageConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Core.Utilities.MessageBrokers;

public interface IMessageConsumer
{
void GetQueue();
}
51 changes: 51 additions & 0 deletions Core/Utilities/MessageBrokers/Kafka/KafkaMessageBroker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

using System.Threading.Tasks;
using Confluent.Kafka;
using Core.Utilities.IoC;
using Core.Utilities.MessageBrokers.RabbitMq;
using Core.Utilities.Results;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Newtonsoft.Json;

namespace Core.Utilities.MessageBrokers.Kafka;

public class KafkaMessageBroker : IMessageBrokerHelper
{
private readonly MessageBrokerOptions _kafkaOptions;

public KafkaMessageBroker()
{
var configuration = ServiceTool.ServiceProvider.GetService<IConfiguration>();
if (configuration != null)
_kafkaOptions = configuration
.GetSection("MessageBrokerOptions").Get<MessageBrokerOptions>();
}

public async Task<IResult> QueueMessageAsync<T>(T messageModel)
{
var producerConfig = new ProducerConfig
{
BootstrapServers = $"{_kafkaOptions.HostName}:{_kafkaOptions.Port}",
Acks = Acks.All
};

var message = JsonConvert.SerializeObject(messageModel);
var topicName = typeof(T).Name;
using var p = new ProducerBuilder<Null, string>(producerConfig).Build();
try
{
await p.ProduceAsync(topicName
, new Message<Null, string>
{
Value = message
});
return new SuccessResult();
}

catch (ProduceException<Null, string> e)
{
return new ErrorResult(e.Message);
}
}
}
9 changes: 9 additions & 0 deletions Core/Utilities/MessageBrokers/MessageBrokerOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Core.Utilities.MessageBrokers;

public class MessageBrokerOptions
{
public string HostName { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
public int Port { get; set; }
}

This file was deleted.

7 changes: 0 additions & 7 deletions Core/Utilities/MessageBrokers/RabbitMq/IMessageConsumer.cs

This file was deleted.

10 changes: 0 additions & 10 deletions Core/Utilities/MessageBrokers/RabbitMq/MessageBrokerOptions.cs

This file was deleted.

82 changes: 42 additions & 40 deletions Core/Utilities/MessageBrokers/RabbitMq/MqConsumerHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,53 @@
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Core.Utilities.MessageBrokers.RabbitMq
namespace Core.Utilities.MessageBrokers.RabbitMq;

public class MqConsumerHelper : IMessageConsumer
{
public class MqConsumerHelper : IMessageConsumer
{
private readonly IConfiguration _configuration;
private readonly MessageBrokerOptions _brokerOptions;
private readonly MessageBrokerOptions _brokerOptions;
private readonly IConfiguration _configuration;

public MqConsumerHelper(IConfiguration configuration)
{
_configuration = configuration;
_brokerOptions = _configuration.GetSection("MessageBrokerOptions").Get<MessageBrokerOptions>();
}
public MqConsumerHelper(IConfiguration configuration)
{
_configuration = configuration;
_brokerOptions = _configuration.GetSection("MessageBrokerOptions").Get<MessageBrokerOptions>();
}

public void GetQueue()
{
var factory = new ConnectionFactory()
public void GetQueue()
{
using var connection = new ConnectionFactory()
{
HostName = _brokerOptions.HostName,
Port = _brokerOptions.Port,
UserName = _brokerOptions.UserName,
Password = _brokerOptions.Password
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(
queue: "DArchQueue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, mq) =>
{
var body = mq.Body.ToArray();
var message = Encoding.UTF8.GetString(body);

Console.WriteLine($"Message: {message}");
};

channel.BasicConsume(
queue: "DArchQueue",
autoAck: true,
consumer: consumer);
Console.ReadKey();
}
Password = _brokerOptions.Password,
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = new TimeSpan(2000),
}
.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(
"DArchQueue",
false,
false,
false,
null);

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, mq) =>
{
var body = mq.Body.ToArray();
var message = Encoding.UTF8.GetString(body);

Console.WriteLine($"Message: {message}");
};

channel.BasicConsume(
"DArchQueue",
true,
consumer);
Console.ReadKey();
}
}
68 changes: 38 additions & 30 deletions Core/Utilities/MessageBrokers/RabbitMq/MqQueueHelper.cs
Original file line number Diff line number Diff line change
@@ -1,43 +1,51 @@
using System.Text;
using System;
using System.Text;
using System.Threading.Tasks;
using Core.Utilities.Results;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using RabbitMQ.Client;

namespace Core.Utilities.MessageBrokers.RabbitMq
namespace Core.Utilities.MessageBrokers.RabbitMq;

public class RMqQueueHelper : IMessageBrokerHelper
{
public class MqQueueHelper : IMessageBrokerHelper
{
private readonly MessageBrokerOptions _brokerOptions;
private readonly MessageBrokerOptions _brokerOptions;

public MqQueueHelper(IConfiguration configuration)
{
Configuration = configuration;
_brokerOptions = Configuration.GetSection("MessageBrokerOptions").Get<MessageBrokerOptions>();
}
public RMqQueueHelper(IConfiguration configuration)
{
Configuration = configuration;
_brokerOptions = Configuration.GetSection("MessageBrokerOptions").Get<MessageBrokerOptions>();
}

public IConfiguration Configuration { get; }
public IConfiguration Configuration { get; }

public void QueueMessage(string messageText)
{
var factory = new ConnectionFactory
public Task<IResult> QueueMessageAsync<T>(T messageModel)
{
using var connection = new ConnectionFactory()
{
HostName = _brokerOptions.HostName,
Port = _brokerOptions.Port,
UserName = _brokerOptions.UserName,
Password = _brokerOptions.Password
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(
queue: "DArchQueue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);

var message = JsonConvert.SerializeObject(messageText);
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: string.Empty, routingKey: "DArchQueue", basicProperties: null, body: body);
}
Password = _brokerOptions.Password,
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = new TimeSpan(2000),
}
.CreateConnection();
using var channel = connection.CreateModel();
var topicName = typeof(T).Name;
channel.QueueDeclare(
topicName,
false,
false,
false,
null);

var message = JsonConvert.SerializeObject(messageModel);
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(string.Empty, topicName, null, body);
return Task.FromResult<IResult>(new SuccessResult());

}
}

0 comments on commit 57626b7

Please sign in to comment.