object Lock_s1 = new object();
object Lock_s2 = new object();
object Lock_s3 = new object();
async void P1()
{
await Task.Run(() =>
{
lock (Lock_s1) Действие_1;
});
}
async void P2()
{
await Task.Run(() =>
{
lock (Lock_s2) Действие_2;
});
}
async void P3()
{
await Task.Run(() =>
{
lock (Lock_s3) Действие_3;
});
}
async void P4()
{
await Task.Run(() =>
{
lock (Lock_s1) lock (Lock_s2) lock (Lock_s3)
{
Действия_4;
}
});
}
В моем случае нужно, чтоб процедуры P1,P2,P3 могли пересекаться между собой.
Но не с процедурой P4.
Этот код работает, но слишком много lock в void P4, особенно если блокировать нужно будет не 3 объекта а 50.. 500..
Так работает вроде:
public static void ololok(int k)
{
lock (obj_massiv[k-1])
{
if (k == 0) Действие;
else ololok(--k);
}
}
Где k
- размер массива локов.
Вы, конечно, можете использовать блокировку объекта, или создать массив syncRoot
'ов, но это не эффективно. Совсем! Мой алгоритм, конечно, сложен, но он довольно эффективен. Кода много, но сам по себе он средней сложности, все сложные или не типовые моменты я прокомментировал; если что-то не понятно, welcome в комментарии.
Ссылки на доп. инфу по спанам и памяти: ссылки #1,2
Для создания такого (concurrent) массива (я буду использовать fixed-size массив) нам понадобятся:
Memory<T>
– сам массивMemory<Synchronizer>
– массив синхронизаторов. Пока не понятно, каких именноsyncR, syncW
– флаги, указывающие, нужно ли синхронизировать ввод/выводВ качестве синхронизатора мы будет использовать SpinLocker
(не путать со SpinLock
). Он у нас особенный. Он запоминает id блокирующего потока. Если оно совпадает, избегает dead-lock'а, иначе ждёт освобождения locker'а. Это оптимизированная версия обычного Monitor
. Вы, конечно, можете использовать и его, но это очень неэффективно, т. к. создаёт много блоков синхронизации и много объектов, а в нашем SpinLocker
только 1 int
– я вас предупредил.
SpinLocker.cs
public struct SpinLocker
{
int threadId;
public static bool TryLock(ref SpinLocker locker, int threadId) =>
Interlocked.CompareExchange(ref locker.threadId, threadId, 0) == 0;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void Lock(ref SpinLocker locker, int threadId)
{
while (!TryLock(ref locker, threadId)) { }
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void Lock(ref SpinLocker locker)
{
var curThreadId = Thread.CurrentThread.ManagedThreadId;
Lock(ref locker, curThreadId);
}
public static void Unlock(ref SpinLocker locker) => Volatile.Write(ref locker.threadId, 0);
// прочие методы (приведение к `int`, потокобезопасное прочтение, ...) пропущены
}
Реализация сверх оптимизирована. Самые главные методы: TryLock
, Lock
(обе перегрузки) и Unlock
– сфокусируйте свой внимание на них.
SynchronizationOptions.cs
[Flags]
public enum SynchronizationOptions
{
NonSynchronized = 0x0,
SynchronizedReading = 0x1,
SynchronizedWriting = 0x2,
FullySynchronized = SynchronizedReading | SychronizedWriting
}
ConcurrentServices.cs
public static partial ConcurrentServices
{
public static void Check(this SynchronizationOptions synchronizationOptions)
{
const SynchronizationOptions maxOpt = SynchronizationOptions.FullySynchronized;
if (synchronizationOptions < 0 || synchronizationOptions > maxOpt)
/*throw*/;
}
}
Тут ничего лишнего – думаю, всё понятно.
Вот мы и подобрались к массиву. Проще сперва написать, а потом объяснить.
ConcurrentArray.cs
public class ConcurrentArray<T> : IList<T>
{
public const SynchronizationOptions DefaultSynchronizationOptions = SynchronizationOptions.SynchronizedWriting;
readonly Memory<T> items;
readonly Memory<SpinLocker> itemLockers;
readonly bool syncR, syncW;
public T this[Index index]
{
get
{
var offset = index.GetOffset(Length);
if (syncR) LockItem(index);
var item = Items[offset];
if (syncR) UnlockItem(index);
return item;
}
set
{
var offset = index.GetOffset(Length);
if (syncW) LockItem(index);
Items[offset] = value;
if (syncW) UnlockItem(index);
}
}
public int Length => items.Length; // можно не синхронизировать, т. к. только чтение
public SynchronizationOptions SynchronizationOptions { get; } // можно не синхронизировать, по аналогии
public ConcurrentArray(Memory<T> buffer, SynchronizationOptions synchronizationOptions = DefaultSynchronizationOptions)
{
synchronizationOptions.Check();
items = buffer;
itemLockers = new SpinLocker[Length];
SynchronizationOptions = synchronizationOptions;
syncR = (SynchronizationOptions & SynchronizationOptions.SynchronizedReading) != 0;
syncW = (SynchronizationOptions & SynchronizationOptions.SynchronizedWriting) != 0;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void LockItem(Index index) // почему `internal` узнаете позже
{
var offset = index.GetOffset(Length);
SpinLocker.Lock(ref itemLockers.Span[offset]);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
void UnlockItem(Index index)
{
var offset = index.GetOffset(Length);
SpinLocker.Unlock(ref itemLockers.Span[offset]);
}
// прочие члены пропущены...
}
Ну вот казалось бы и всё? Да, этот массив работает, но есть 1 "но". Если вы попытаетесь его протестировать, скажем, на инкременте (10 потоков одновременно увеличивают значение 1-го элемента массива типа int
до 1 000 000 каждый поток), то вы столкнётесь с неприятным результатом. Пока вы увеличиваете значение, 1-2 потока его уже увеличили, и вы присваиваете "устаревшее" значение, таким образом вы получаете значение намного меньше ожидаемого. Как с этим бороться?
Ответ прост, нужно создать disposable struct
. При его создании элемент блокируется, а при его disposing'е – освобождается. Но есть 1 "но". Это не просто struct
, а ref struct
, т. к. нам нужно, чтобы данный объект не передавали в другие потоки, что сильно усложняет задачу. Но я опять же нашёл решение, немного жёсткое, но что поделать.
ConcurrentItemRef
Это потокобезопасная ссылка на элемент. Причём буквально это управляемая GC ссылка на элемент. Пара типов-helper'ов.
Ссылки
Ref.cs
public readonly ref struct Ref<T>
{
public static Ref<T> Null => default;
readonly Span<T> targetWrapper;
[NotNull] // возврат не null
public ref T Target
{
get
{
if (IsNull) throw new NullReferenceException();
return ref MemoryMarshal.GetReference(targetWrapper);
}
}
public bool IsNull => targetWrapper.IsEmpty;
public bool NotNull => !IsNull;
public Ref([AllowNull] /*может быть null*/ ref T target)
{
targetWrapper = target is null ? default : MemoryMarshal.CreateSpan(ref target, 1);
}
public static Ref<T> Wrap([AllowNull] ref T target) => new Ref<T>(ref target);
public static Ref<T> GetRef(Span<T> buffer) => buffer.IsEmpty ? default :
new Ref<T>(ref MemoryMarshal.GetReference(buffer));
public bool TryGetTarget([MaybeNullWhen(false)] out T target)
{
target = default!;
if (IsNull) return false;
target = ref MemoryMarshal.GetReference(targetWrapper);
return true;
}
// прочие члены пропущены...
}
ReadOnlyRef.cs
public readonly ref struct ReadOnlyRef<T>
{
public static Ref<T> Null => default;
readonly ReadOnlySpan<T> targetWrapper;
[NotNull]
public readonly ref T Target
{
get
{
if (IsNull) throw new NullReferenceException();
return ref MemoryMarshal.GetReference(targetWrapper);
}
}
public bool IsNull => targetWrapper.IsEmpty;
public bool NotNull => !IsNull;
public ReadOnlyRef([AllowNull] in T target)
{
targetWrapper = target is null ? default : MemoryMarshal.CreateSpan(ref Unsafe.AsRef(in target), 1);
}
public static ReadOnlyRef<T> Wrap([AllowNull] in T target) => new ReadOnlyRef<T>(in target);
public static ReadOnlyRef<T> GetRef(ReadOnlySpan<T> buffer) => buffer.IsEmpty ? default :
new ReadOnlyRef<T>(in MemoryMarshal.GetReference(buffer));
public bool TryGetTarget([MaybeNullWhen(false)] out T target)
{
target = default!;
if (IsNull) return false;
target = MemoryMarshal.GetReference(targetWrapper);
return true;
}
// прочие члены пропущены...
}
Смотрите в чём дело. Нам в будущем понадобится хранить ссылку на класс, но это ref struct
(ссылка #3). Как быть? Обернуть её в Span
(ссылка #3)! Ссылка – это Span
с длиной 1, т. е. ref struct
. Мы его обернули в тип Ref
/ReadOnlyRef
, чтоб просто лучше выглядело.
Сама "ссылка на элемент"
ConcurrentArray.cs
public class ConcurrentArray<T> : IList<T>
{
<...>
readonly ConcurrentItemRef<T>.ReleaseCallback releaseCallback;
public ConcurrentArray(<...>)
{
<...>
releaseCallback = UnlockItem;
}
internal Memory<T> AsMemory() => items; // возвращает несинхронизированную память, поэтому `internal`
public ConcurrentItemRef<T> CreateItemRef(Index index) => new ConcurrentItemRef<T>(this, index);
<...>
}
ConcurrentItemRef.cs
public ref struct ConcurrentItemRef<T>
{
Ref<T> valueRef; // ссылка на элемент
ReadOnlyRef<ReleaseCallback> releaseCallbackRef; // ссылка на release callback
readonly int offset; // смещение нашего элемента
public ref T Value
{
get
{
ThrowIfReleased();
return ref valueRef.Target;
}
}
bool IsReleased => valueRef.IsNull;
internal ConcurrentItemRef(ConcurrentArray<T> array, Index index)
{
if (array == null) throw new ArgumentNullException("array");
offset = index.GetOffset(array.Length);
valueRef = Ref<T>.GetRef(array.AsMemory().Slice(offset, 1).Span);
releaseCallbackRef = ReadOnlyRef<ReleaseCallback>.Wrap(in array.releaseCallback);
array.LockItem(index); // вот и пригодился нам `internal` `LockItem`
}
public void Release()
{
if (IsReleased) return;
if (releaseCallbackRef.TryGetTarget(out var releaseCallback))
{
releaseCallback.Invoke(offset);
releaseCallbackRef = default;
}
valueRef = default;
}
public void Dispose() => Release();
void ThrowIfReleased()
{
if (IsReleased) throw new InvalidOperationException("This concurrent item ref is released");
}
internal delegate void ReleaseCallback(Index offset);
}
Вы меня, наверно, спросите: "Зачем нам брать и создавать кокой-то release callback, если можно просто запихнуть массив?" Нет, нельзя. Т. к. Ref
– это ссылка на класс, а массив – это просто класс, т. е. нам где-то нужно инициализировать ссылку на массив, а т. к. единственное место где это без ошибки компиляции можно сделать – стек (т. е. нигде, т. к. по окончанию выполнения метода мы сразу же потеряет доступ к ссылке), то мы не можем передать его на прямую, да и это играет на расширяемости (можно создать, например, ConcurrencyList
, который тоже их [ConcurrentcyItemRef
] будет возвращать).
Наконец-то тестирование всего этого кода! Это тот самый инкремент.
[TestMethod]
public void Test()
{
const int taskCount = 5;
var array = new ConcurrentArray<int>(1);
var tasks = new Task[taskCount];
for (var i = 0; i < taskCount; i++)
tasks[i] = CountAsync(array, 0);
Task.WhenAll(tasks).Wait();
Assert.AreEqual(5_000_000, array[0]);
}
async Task CountAsync(ConcurrentArray<int> array, Index index)
{
await Task.CompletedTask.ConfigureAwait(false); // для аснихронности (ссылки #4,5)
for (var i = 0; i < 1_000_000; i++)
using (var itemRef = array.CreateItemRef(index))
itemRef.Value++;
}
Думаю, вы понимаете результат.
Ссылкиref struct
хранить ref fields
(en)async
/await
Виртуальный выделенный сервер (VDS) становится отличным выбором
У меня есть Input Field для ввода названия проекта, мне нужно, чтобы текст в нем обнулялся по нажатию на кнопку "Отмена"Подскажите, пожалуйста, решение...
Как с помощью linq заменить все элементы целочисленной последовательности, заданной с помощью EnumerableRange на сумму их цифр
Имеется таблица с более, чем 3 млн строкПо этой таблице необходимо делать запрос на получение количества строк в определённые временные промежутки...