Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1500,9 +1500,8 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref)

for (auto sci : pollOrder) {
auto& info = state.inputChannelInfos[sci];
auto& channelSpec = spec.inputChannels[sci];
O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
O2_SIGNPOST_START(device, cid, "channels", "Processing channel %s", channelSpec.name.c_str());
O2_SIGNPOST_START(device, cid, "channels", "Processing channel %s", info.channel->GetName().c_str());

if (info.state != InputChannelState::Completed && info.state != InputChannelState::Pull) {
context.allDone = false;
Expand All @@ -1514,18 +1513,18 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref)
DataProcessingDevice::handleData(ref, info);
}
O2_SIGNPOST_END(device, cid, "channels", "Flushing channel %s which is in state %d and has %zu parts still pending.",
channelSpec.name.c_str(), (int)info.state, info.parts.Size());
info.channel->GetName().c_str(), (int)info.state, info.parts.Size());
continue;
}
if (info.channel == nullptr) {
O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is nullptr and has %zu parts still pending.",
channelSpec.name.c_str(), (int)info.state, info.parts.Size());
info.channel->GetName().c_str(), (int)info.state, info.parts.Size());
continue;
}
// Only poll DPL channels for now.
if (info.channelType != ChannelAccountingType::DPL) {
O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is not a DPL channel and has %zu parts still pending.",
channelSpec.name.c_str(), (int)info.state, info.parts.Size());
info.channel->GetName().c_str(), (int)info.state, info.parts.Size());
continue;
}
auto& socket = info.channel->GetSocket();
Expand All @@ -1537,7 +1536,7 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref)
socket.Events(&info.hasPendingEvents);
// If we do not read, we can continue.
if ((info.hasPendingEvents & 1) == 0 && (info.parts.Size() == 0)) {
O2_SIGNPOST_END(device, cid, "channels", "No pending events and no remaining parts to process for channel %{public}s", channelSpec.name.c_str());
O2_SIGNPOST_END(device, cid, "channels", "No pending events and no remaining parts to process for channel %{public}s", info.channel->GetName().c_str());
continue;
}
}
Expand All @@ -1555,12 +1554,12 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref)
bool newMessages = false;
while (true) {
O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Receiving loop called for channel %{public}s (%d) with oldest possible timeslice %zu",
channelSpec.name.c_str(), info.id.value, info.oldestForChannel.value);
info.channel->GetName().c_str(), info.id.value, info.oldestForChannel.value);
if (info.parts.Size() < 64) {
fair::mq::Parts parts;
info.channel->Receive(parts, 0);
if (parts.Size()) {
O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Received %zu parts from channel %{public}s (%d).", parts.Size(), channelSpec.name.c_str(), info.id.value);
O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Received %zu parts from channel %{public}s (%d).", parts.Size(), info.channel->GetName().c_str(), info.id.value);
}
for (auto&& part : parts) {
info.parts.fParts.emplace_back(std::move(part));
Expand Down Expand Up @@ -1589,7 +1588,7 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref)
}
}
O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).",
channelSpec.name.c_str(), info.id.value);
info.channel->GetName().c_str(), info.id.value);
}
}

Expand Down