Updated with scheduler

This commit is contained in:
Martino Ferrari
2026-02-21 20:20:08 +01:00
parent 7cc4b81f05
commit 955eb02924
15 changed files with 664 additions and 102 deletions

View File

@@ -0,0 +1,153 @@
#include "DebugFastScheduler.h"
#include "AdvancedErrorManagement.h"
#include "ExecutionInfo.h"
#include "MultiThreadService.h"
#include "RealTimeApplication.h"
#include "Threads.h"
#include "MemoryOperationsHelper.h"
namespace MARTe {
const uint64 ALL_CPUS = 0xFFFFFFFFFFFFFFFFull;
DebugFastScheduler::DebugFastScheduler() :
FastScheduler(),
debugBinder(*this, &DebugFastScheduler::Execute)
{
debugService = NULL_PTR(DebugService*);
}
DebugFastScheduler::~DebugFastScheduler() {
}
bool DebugFastScheduler::Initialise(StructuredDataI & data) {
bool ret = FastScheduler::Initialise(data);
if (ret) {
ReferenceContainer *root = ObjectRegistryDatabase::Instance();
Reference serviceRef = root->Find("DebugService");
if (serviceRef.IsValid()) {
debugService = dynamic_cast<DebugService*>(serviceRef.operator->());
}
}
return ret;
}
ErrorManagement::ErrorType DebugFastScheduler::DebugSetupThreadMap() {
ErrorManagement::ErrorType err;
ComputeMaxNThreads();
REPORT_ERROR(ErrorManagement::Information, "DebugFastScheduler: Max Threads=%!", maxNThreads);
multiThreadService = new (NULL) MultiThreadService(debugBinder);
multiThreadService->SetNumberOfPoolThreads(maxNThreads);
err = multiThreadService->CreateThreads();
if (err.ErrorsCleared()) {
rtThreadInfo[0] = new RTThreadParam[maxNThreads];
rtThreadInfo[1] = new RTThreadParam[maxNThreads];
for (uint32 i = 0u; i < numberOfStates; i++) {
cpuMap[i] = new uint64[maxNThreads];
for (uint32 j = 0u; j < maxNThreads; j++) {
cpuMap[i][j] = ALL_CPUS;
}
}
if (countingSem.Create(maxNThreads)) {
for (uint32 i = 0u; i < numberOfStates; i++) {
uint32 nThreads = states[i].numberOfThreads;
cpuThreadMap[i] = new uint32[nThreads];
for (uint32 j = 0u; j < nThreads; j++) {
uint64 cpu = static_cast<uint64>(states[i].threads[j].cpu.GetProcessorMask());
CreateThreadMap(cpu, i, j);
}
}
}
}
return err;
}
void DebugFastScheduler::CustomPrepareNextState() {
ErrorManagement::ErrorType err;
err = !realTimeApplicationT.IsValid();
if (err.ErrorsCleared()) {
uint8 nextBuffer = static_cast<uint8>(realTimeApplicationT->GetIndex());
nextBuffer++;
nextBuffer &= 0x1u;
if (!initialised) {
cpuMap = new uint64*[numberOfStates];
cpuThreadMap = new uint32*[numberOfStates];
err = DebugSetupThreadMap();
}
if (err.ErrorsCleared()) {
for (uint32 j = 0u; j < maxNThreads; j++) {
rtThreadInfo[nextBuffer][j].executables = NULL_PTR(ExecutableI **);
rtThreadInfo[nextBuffer][j].numberOfExecutables = 0u;
rtThreadInfo[nextBuffer][j].cycleTime = NULL_PTR(uint32 *);
rtThreadInfo[nextBuffer][j].lastCycleTimeStamp = 0u;
}
ScheduledState *nextState = GetSchedulableStates()[nextBuffer];
uint32 numberOfThreads = nextState->numberOfThreads;
for (uint32 i = 0u; i < numberOfThreads; i++) {
rtThreadInfo[nextBuffer][cpuThreadMap[nextStateIdentifier][i]].executables = nextState->threads[i].executables;
rtThreadInfo[nextBuffer][cpuThreadMap[nextStateIdentifier][i]].numberOfExecutables = nextState->threads[i].numberOfExecutables;
rtThreadInfo[nextBuffer][cpuThreadMap[nextStateIdentifier][i]].cycleTime = nextState->threads[i].cycleTime;
rtThreadInfo[nextBuffer][cpuThreadMap[nextStateIdentifier][i]].lastCycleTimeStamp = 0u;
}
}
}
}
ErrorManagement::ErrorType DebugFastScheduler::Execute(ExecutionInfo & information) {
ErrorManagement::ErrorType ret;
if (information.GetStage() == MARTe::ExecutionInfo::StartupStage) {
}
else if (information.GetStage() == MARTe::ExecutionInfo::MainStage) {
uint32 threadNumber = information.GetThreadNumber();
(void) eventSem.Wait(TTInfiniteWait);
if (superFast == 0u) {
(void) countingSem.WaitForAll(TTInfiniteWait);
}
uint32 idx = static_cast<uint32>(realTimeApplicationT->GetIndex());
if (rtThreadInfo[idx] != NULL_PTR(RTThreadParam *)) {
if (rtThreadInfo[idx][threadNumber].numberOfExecutables > 0u) {
// EXECUTION CONTROL HOOK
if (debugService != NULL_PTR(DebugService*)) {
while (debugService->IsPaused()) {
Sleep::MSec(1);
}
}
bool ok = ExecuteSingleCycle(rtThreadInfo[idx][threadNumber].executables, rtThreadInfo[idx][threadNumber].numberOfExecutables);
if (!ok) {
if (errorMessage.IsValid()) {
(void)MessageI::SendMessage(errorMessage, this);
}
}
uint32 absTime = 0u;
if (rtThreadInfo[idx][threadNumber].lastCycleTimeStamp != 0u) {
uint64 tmp = (HighResolutionTimer::Counter() - rtThreadInfo[idx][threadNumber].lastCycleTimeStamp);
float64 ticksToTime = (static_cast<float64>(tmp) * clockPeriod) * 1e6;
absTime = static_cast<uint32>(ticksToTime);
}
uint32 sizeToCopy = static_cast<uint32>(sizeof(uint32));
(void)MemoryOperationsHelper::Copy(rtThreadInfo[idx][threadNumber].cycleTime, &absTime, sizeToCopy);
rtThreadInfo[idx][threadNumber].lastCycleTimeStamp = HighResolutionTimer::Counter();
}
else {
(void) unusedThreadsSem.Wait(TTInfiniteWait);
}
}
}
return ret;
}
CLASS_REGISTER(DebugFastScheduler, "1.0")
}

View File

@@ -3,6 +3,7 @@
#include "StreamString.h"
#include "BasicSocket.h"
#include "DebugBrokerWrapper.h"
#include "DebugFastScheduler.h"
#include "ObjectRegistryDatabase.h"
#include "ClassRegistryItem.h"
#include "ObjectBuilder.h"
@@ -119,7 +120,12 @@ bool DebugService::Initialise(StructuredDataI & data) {
(void)data.Read("TcpLogPort", logPort);
}
(void)data.Read("StreamIP", streamIP);
StreamString tempIP;
if (data.Read("StreamIP", tempIP)) {
streamIP = tempIP;
} else {
streamIP = "127.0.0.1";
}
uint32 suppress = 1;
if (data.Read("SuppressTimeoutLogs", suppress)) {
@@ -148,14 +154,21 @@ bool DebugService::Initialise(StructuredDataI & data) {
REPORT_ERROR(ErrorManagement::FatalError, "DebugService: Failed to open TCP Server Socket");
return false;
}
if (!tcpServer.Listen(controlPort)) {
REPORT_ERROR(ErrorManagement::FatalError, "DebugService: Failed to Listen on port %u", controlPort);
return false;
}
if (!udpSocket.Open()) {
REPORT_ERROR(ErrorManagement::FatalError, "DebugService: Failed to open UDP Socket");
return false;
}
if (!logServer.Open()) {
REPORT_ERROR(ErrorManagement::FatalError, "DebugService: Failed to open Log Server Socket");
return false;
}
(void)logServer.Listen(logPort);
if (threadService.Start() != ErrorManagement::NoError) {
REPORT_ERROR(ErrorManagement::FatalError, "DebugService: Failed to start Server thread");
@@ -201,6 +214,10 @@ void DebugService::PatchRegistry() {
PatchItemInternal("MemoryMapSynchronisedMultiBufferInputBroker", &b8);
static DebugMemoryMapSynchronisedMultiBufferOutputBrokerBuilder b9;
PatchItemInternal("MemoryMapSynchronisedMultiBufferOutputBroker", &b9);
// Patch Scheduler
static ObjectBuilderT<DebugFastScheduler> schedBuilder;
PatchItemInternal("FastScheduler", &schedBuilder);
}
void DebugService::ProcessSignal(DebugSignalInfo* s, uint32 size) {
@@ -295,9 +312,6 @@ static bool RecursiveGetFullObjectName(ReferenceContainer *container, const Obje
bool DebugService::GetFullObjectName(const Object &obj, StreamString &fullPath) {
fullPath = "";
if (RecursiveGetFullObjectName(ObjectRegistryDatabase::Instance(), obj, fullPath)) {
StreamString abs = "Root.";
abs += fullPath;
fullPath = abs;
return true;
}
return false;
@@ -311,7 +325,6 @@ ErrorManagement::ErrorType DebugService::Server(ExecutionInfo & info) {
if (info.GetStage() == ExecutionInfo::TerminationStage) return ErrorManagement::NoError;
if (info.GetStage() == ExecutionInfo::StartupStage) {
serverThreadId = Threads::Id();
if (!tcpServer.Listen(controlPort)) return ErrorManagement::FatalError;
return ErrorManagement::NoError;
}
@@ -375,7 +388,6 @@ ErrorManagement::ErrorType DebugService::LogStreamer(ExecutionInfo & info) {
if (info.GetStage() == ExecutionInfo::TerminationStage) return ErrorManagement::NoError;
if (info.GetStage() == ExecutionInfo::StartupStage) {
logStreamerThreadId = Threads::Id();
if (!logServer.Listen(logPort)) return ErrorManagement::FatalError;
return ErrorManagement::NoError;
}