Не могу понять, почему мой BackgroundService
(.NET Core 3.1) не может корректно завершить свою работу.
Написал сервис, который должен слушать tcp-порт и обрабатывать сообщения определенного формата. Написал следующий BackgroundService
:
internal sealed class TcpListenerBackgroundService : BackgroundService
{
private readonly ITcpPortListener _tcpPortListener;
private readonly ILogger<TcpListenerBackgroundService> _logger;
public TcpListenerBackgroundService(ITcpPortListener tcpListener, ILogger<TcpListenerBackgroundService> logger)
{
_tcpPortListener = tcpListener;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
await _tcpPortListener.ListenAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Unexpected error was occured");
}
}
}
Сюда в качестве ITcpPortListener
внедряется следующий TcpPortListener
:
public sealed class TcpPortListener : ITcpPortListener
{
private readonly ILogger<TcpPortListener> _logger;
private readonly IPacketProcessor _packetProcessor;
private readonly TcpListener _listener;
private TaskCompletionSource<object> _appTerminationSource;
public TcpPortListener(
ITcpPortListenerConfiguration config,
IPacketProcessor packetProcessor,
ILogger<TcpPortListener> logger)
{
_listener = new TcpListener(IPAddress.Any, config.PortNumber);
_packetProcessor = packetProcessor;
_logger = logger;
}
public async Task ListenAsync(CancellationToken stoppingToken)
{
_appTerminationSource = new TaskCompletionSource<object>();
await using (stoppingToken.Register(_appTerminationSource.SetCanceled))
{
var taskList = new List<Task>();
_listener.Start();
while (!stoppingToken.IsCancellationRequested)
{
var acceptClientTask = _listener.AcceptTcpClientAsync();
await Task.WhenAny(acceptClientTask, _appTerminationSource.Task);
if (acceptClientTask.IsCompletedSuccessfully)
{
var client = acceptClientTask.Result;
var processTask = ProcessClientAsync(client, stoppingToken);
taskList.Add(processTask);
}
taskList.RemoveAll(p => p.IsCompleted);
}
_logger.LogInformation("Waiting for all clients termination...");
await Task.WhenAll(taskList.ToArray());
_logger.LogInformation("Terminate listening...");
_listener.Stop();
_logger.LogInformation("Listening was terminated successfully!");
}
}
private async Task ProcessClientAsync(TcpClient client, CancellationToken stoppingToken)
{
var clientIp = GetClientIp(client);
_logger.LogInformation($"Client[{clientIp}]. Connected");
try
{
var stream = client.GetStream();
var socket = client.Client;
_logger.LogInformation($"Client[{clientIp}]. Start processing");
var buffer = new byte[1024];
var packetContainer = new SuntechPacketContainer();
var isSocketConnected = CheckSocketConnection(socket);
while (isSocketConnected && !stoppingToken.IsCancellationRequested)
{
var readTask = stream.ReadAsync(buffer, 0, buffer.Length, stoppingToken);
await Task.WhenAny(readTask, _appTerminationSource.Task);
if (readTask.IsCompletedSuccessfully)
{
var bytesRead = readTask.Result;
if (bytesRead == 0)
{
isSocketConnected = false;
break;
}
packetContainer.Append(buffer.Take(bytesRead));
var packets = packetContainer.FetchFullPackets();
await ProcessPacketsAsync(packets);
isSocketConnected = CheckSocketConnection(socket);
}
}
if (!isSocketConnected)
{
_logger.LogInformation($"Client[{clientIp}]. Socket disconnected. The message processing for the client has been stopped");
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"Client[{clientIp}]. Unexpected error was occured during client processing");
}
finally
{
_logger.LogInformation($"Client[{clientIp}]. Closing the client connection");
client.Close();
}
}
private static string GetClientIp(TcpClient client)
{
return ((IPEndPoint)client.Client.RemoteEndPoint).Address.ToString();
}
private async Task ProcessPacketsAsync(IEnumerable<byte[]> packets)
{
foreach (var packet in packets)
{
await _packetProcessor.ProcessPacketAsync(packet);
}
}
private static bool CheckSocketConnection(Socket socket)
{
const int connectionTimeoutInMicroseconds = 1_000;
var poll = socket.Poll(connectionTimeoutInMicroseconds, SelectMode.SelectRead);
return !((poll && (socket.Available == 0)) || !socket.Connected);
}
}
Развернул этот сервис на CentOS 7 с помощью systemd
.
Проблема в том, что иногда (не знаю точно, при каких условиях), если к сервису подключен хотя бы один клиент, и в этот момент я захочу остановить сервис (вызвать service stop
), то сервис будет бесконечно завершаться и systemd
повиснет, из-за чего придется убивать ее, после чего, узнав статус сервиса, мне будет сказано, что процесс был убит из-за истечения таймаута ожидания. Я совсем не могу понять, почему так происходит, ведь по логам метод ListenAsync
отработал. Это точно не из-за того, что метод ListenAsync
долго завершается. Прилагаю пример логов:
//...
2020-11-03 04:01:22.299 -05:00 [INF] Application is shutting down...
2020-11-03 04:01:22.333 -05:00 [INF] Client[127.0.0.1]. Closing the client connection
2020-11-03 04:01:22.334 -05:00 [INF] Waiting for all clients termination...
2020-11-03 04:01:22.335 -05:00 [INF] Terminate listening...
2020-11-03 04:01:22.336 -05:00 [INF] Listening was terminated successfully!
Есть ощущение, будто в некоторых случаях не освобождаются все ресурсы. Прошу проинспектировать мой код на наличие возможных неточностей с управлением ресурсами. Мой глаз уже намылен, ничего не видит.
if (readTask.IsCompletedSuccessfully)
вот здесь ошибка. Если будет false
, оно просто пропустит обработку и пойдет читать дальше по циклу, и так оно может запустить миллион тасков, которые будут ждать что-то там из сокета. Мне кажется, надо сделать break
цикла.
Eсли WhenAny
отработал через _appTerminationSource.Task
, вам обязательно надо где-то вызвать await readTask
, чтобы убедиться что операция чтения завершена прежде чем гасить клиент. Возможно оттуда даже исключение выбросится при await
.
Попробуйте такой паттерн
while (isSocketConnected && !stoppingToken.IsCancellationRequested)
{
using CancellationTokenSource localCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
var readTask = stream.ReadAsync(buffer, 0, buffer.Length, localCts.Token);
var fastTask = await Task.WhenAny(readTask, _appTerminationSource.Task);
// WhenAny не дождался readTask, надо канселить локальный токен и завершать цикл
// (или не завершать цикл, это уж вам решать)
if (fastTask != readTask)
{
localCts.Cancel();
await readTask; // здесь с большой вероятностью вывалится OperationCanceledException
break;
}
if (readTask.IsCompletedSuccessfully)
{
var bytesRead = readTask.Result;
if (bytesRead == 0)
{
isSocketConnected = false;
break;
}
packetContainer.Append(buffer.Take(bytesRead));
var packets = packetContainer.FetchFullPackets();
await ProcessPacketsAsync(packets);
isSocketConnected = CheckSocketConnection(socket);
}
}
Виртуальный выделенный сервер (VDS) становится отличным выбором
Пользователь создаёт запись в файлеДалее, если нужно, он может переписать нужную запись
В общем, имеются в программе события, одно из них-CollectionCountChanged, которое генерируется функцией OnCollectionCountChanged:
Формат содержимого файла: дата,фамилия сотрудника,кол-во часов роботы,проделанная работа