BackgroundService не может корректно завершиться

164
18 апреля 2022, 08:10

Не могу понять, почему мой BackgroundService (.NET Core 3.1) не может корректно завершить свою работу.

Написал сервис, который должен слушать tcp-порт и обрабатывать сообщения определенного формата. Написал следующий BackgroundService:

internal sealed class TcpListenerBackgroundService : BackgroundService
{
    private readonly ITcpPortListener _tcpPortListener;
    private readonly ILogger<TcpListenerBackgroundService> _logger;
    public TcpListenerBackgroundService(ITcpPortListener tcpListener, ILogger<TcpListenerBackgroundService> logger)
    {
        _tcpPortListener = tcpListener;
        _logger = logger;
    }
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        try
        {
            await _tcpPortListener.ListenAsync(stoppingToken);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Unexpected error was occured");
        }
    }
}

Сюда в качестве ITcpPortListener внедряется следующий TcpPortListener:

public sealed class TcpPortListener : ITcpPortListener
{
    private readonly ILogger<TcpPortListener> _logger;
    private readonly IPacketProcessor _packetProcessor;
    private readonly TcpListener _listener;
    private TaskCompletionSource<object> _appTerminationSource;
    public TcpPortListener(
        ITcpPortListenerConfiguration config,
        IPacketProcessor packetProcessor,
        ILogger<TcpPortListener> logger)
    {
        _listener = new TcpListener(IPAddress.Any, config.PortNumber);
        _packetProcessor = packetProcessor;
        _logger = logger;
    }
    public async Task ListenAsync(CancellationToken stoppingToken)
    {
        _appTerminationSource = new TaskCompletionSource<object>();
        await using (stoppingToken.Register(_appTerminationSource.SetCanceled))
        {
            var taskList = new List<Task>();
            _listener.Start();
            while (!stoppingToken.IsCancellationRequested)
            {
                var acceptClientTask = _listener.AcceptTcpClientAsync();
                await Task.WhenAny(acceptClientTask, _appTerminationSource.Task);
                if (acceptClientTask.IsCompletedSuccessfully)
                {
                    var client = acceptClientTask.Result;
                    var processTask = ProcessClientAsync(client, stoppingToken);
                    taskList.Add(processTask);
                }
                taskList.RemoveAll(p => p.IsCompleted);
            }
            _logger.LogInformation("Waiting for all clients termination...");
            await Task.WhenAll(taskList.ToArray());
            _logger.LogInformation("Terminate listening...");
            _listener.Stop();
            _logger.LogInformation("Listening was terminated successfully!");
        }
    }
    private async Task ProcessClientAsync(TcpClient client, CancellationToken stoppingToken)
    {
        var clientIp = GetClientIp(client);
        _logger.LogInformation($"Client[{clientIp}]. Connected");
        try
        {
            var stream = client.GetStream();
            var socket = client.Client;
            _logger.LogInformation($"Client[{clientIp}]. Start processing");
            var buffer = new byte[1024];
            var packetContainer = new SuntechPacketContainer();
            var isSocketConnected = CheckSocketConnection(socket);
            while (isSocketConnected && !stoppingToken.IsCancellationRequested)
            {
                var readTask = stream.ReadAsync(buffer, 0, buffer.Length, stoppingToken);
                await Task.WhenAny(readTask, _appTerminationSource.Task);
                if (readTask.IsCompletedSuccessfully)
                {
                    var bytesRead = readTask.Result;
                    if (bytesRead == 0)
                    {
                        isSocketConnected = false;
                        break;
                    }
                    packetContainer.Append(buffer.Take(bytesRead));
                    var packets = packetContainer.FetchFullPackets();
                    await ProcessPacketsAsync(packets);
                    isSocketConnected = CheckSocketConnection(socket);
                }
            }
            if (!isSocketConnected)
            {
                _logger.LogInformation($"Client[{clientIp}]. Socket disconnected. The message processing for the client has been stopped");
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, $"Client[{clientIp}]. Unexpected error was occured during client processing");
        }
        finally
        {
            _logger.LogInformation($"Client[{clientIp}]. Closing the client connection");
            client.Close();
        }
    }
    private static string GetClientIp(TcpClient client)
    {
        return ((IPEndPoint)client.Client.RemoteEndPoint).Address.ToString();
    }
    private async Task ProcessPacketsAsync(IEnumerable<byte[]> packets)
    {
        foreach (var packet in packets)
        {
            await _packetProcessor.ProcessPacketAsync(packet);
        }
    }
    private static bool CheckSocketConnection(Socket socket)
    {
        const int connectionTimeoutInMicroseconds = 1_000;
        var poll = socket.Poll(connectionTimeoutInMicroseconds, SelectMode.SelectRead);
        return !((poll && (socket.Available == 0)) || !socket.Connected);
    }
}

Развернул этот сервис на CentOS 7 с помощью systemd.

Проблема в том, что иногда (не знаю точно, при каких условиях), если к сервису подключен хотя бы один клиент, и в этот момент я захочу остановить сервис (вызвать service stop), то сервис будет бесконечно завершаться и systemd повиснет, из-за чего придется убивать ее, после чего, узнав статус сервиса, мне будет сказано, что процесс был убит из-за истечения таймаута ожидания. Я совсем не могу понять, почему так происходит, ведь по логам метод ListenAsync отработал. Это точно не из-за того, что метод ListenAsync долго завершается. Прилагаю пример логов:

//...
2020-11-03 04:01:22.299 -05:00 [INF] Application is shutting down...
2020-11-03 04:01:22.333 -05:00 [INF] Client[127.0.0.1]. Closing the client connection
2020-11-03 04:01:22.334 -05:00 [INF] Waiting for all clients termination...
2020-11-03 04:01:22.335 -05:00 [INF] Terminate listening...
2020-11-03 04:01:22.336 -05:00 [INF] Listening was terminated successfully!

Есть ощущение, будто в некоторых случаях не освобождаются все ресурсы. Прошу проинспектировать мой код на наличие возможных неточностей с управлением ресурсами. Мой глаз уже намылен, ничего не видит.

Answer 1

if (readTask.IsCompletedSuccessfully) вот здесь ошибка. Если будет false, оно просто пропустит обработку и пойдет читать дальше по циклу, и так оно может запустить миллион тасков, которые будут ждать что-то там из сокета. Мне кажется, надо сделать break цикла.

Eсли WhenAny отработал через _appTerminationSource.Task, вам обязательно надо где-то вызвать await readTask, чтобы убедиться что операция чтения завершена прежде чем гасить клиент. Возможно оттуда даже исключение выбросится при await.

Попробуйте такой паттерн

while (isSocketConnected && !stoppingToken.IsCancellationRequested)
{
    using CancellationTokenSource localCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
    var readTask = stream.ReadAsync(buffer, 0, buffer.Length, localCts.Token);
    var fastTask = await Task.WhenAny(readTask, _appTerminationSource.Task);
    
    // WhenAny не дождался readTask, надо канселить локальный токен и завершать цикл
    // (или не завершать цикл, это уж вам решать)
    if (fastTask != readTask)
    {
        localCts.Cancel();
        await readTask; // здесь с большой вероятностью вывалится OperationCanceledException
        break;
    }
    if (readTask.IsCompletedSuccessfully)
    {
        var bytesRead = readTask.Result;
        if (bytesRead == 0)
        {
            isSocketConnected = false;
            break;
        }
        packetContainer.Append(buffer.Take(bytesRead));
        var packets = packetContainer.FetchFullPackets();
        await ProcessPacketsAsync(packets);
        isSocketConnected = CheckSocketConnection(socket);
    }
}
READ ALSO
C# переписать строку в файле

C# переписать строку в файле

Пользователь создаёт запись в файлеДалее, если нужно, он может переписать нужную запись

142
Вопрос по синтаксису вызова делегата в c# [дубликат]

Вопрос по синтаксису вызова делегата в c# [дубликат]

В общем, имеются в программе события, одно из них-CollectionCountChanged, которое генерируется функцией OnCollectionCountChanged:

124
Как переписать данные файла

Как переписать данные файла

Формат содержимого файла: дата,фамилия сотрудника,кол-во часов роботы,проделанная работа

182
Путь содержит недопустимые знаки C#

Путь содержит недопустимые знаки C#

При обращении к файлу выдает исключение:

163