Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions relay-dynamic-config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,6 @@ pub enum Feature {
/// Enable the upload endpoint for attachments.
#[serde(rename = "projects:relay-upload-endpoint")]
UploadEndpoint,
/// Enable the new Error processing pipeline in Relay.
#[serde(rename = "organizations:relay-new-error-processing")]
NewErrorProcessing,
/// Enable the new Client Reports pipeline in Relay.
#[serde(rename = "organizations:new-client-report-processing")]
NewClientReportProcessing,
Expand Down
3 changes: 0 additions & 3 deletions relay-server/src/processing/errors/errors/nswitch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ impl Counted for Nswitch {
/// An error returned when parsing the dying message attachment.
#[derive(Debug, thiserror::Error)]
pub enum SwitchProcessingError {
#[error("invalid json")]
#[cfg_attr(not(feature = "processing"), expect(unused))]
InvalidJson(#[source] serde_json::Error),
#[error("envelope parsing failed")]
EnvelopeParsing(#[from] EnvelopeError),
#[error("unexpected EOF, expected {expected:?}")]
Expand Down
14 changes: 1 addition & 13 deletions relay-server/src/processing/utils/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use relay_statsd::metric;

use crate::constants::DEFAULT_EVENT_RETENTION;
use crate::envelope::AttachmentType;
use crate::envelope::{Envelope, EnvelopeHeaders, Item};
use crate::envelope::{EnvelopeHeaders, Item};
use crate::processing::Context;
use crate::services::processor::{MINIMUM_CLOCK_DRIFT, ProcessingError};
use crate::services::projects::project::ProjectInfo;
Expand Down Expand Up @@ -391,18 +391,6 @@ pub fn filter(
#[derive(Debug, Copy, Clone)]
pub struct EventFullyNormalized(pub bool);

impl EventFullyNormalized {
/// Returns `true` if the event is fully normalized, `false` otherwise.
pub fn new(envelope: &Envelope) -> Self {
let event_fully_normalized = envelope.meta().request_trust().is_trusted()
&& envelope
.items()
.any(|item| item.creates_event() && item.fully_normalized());

Self(event_fully_normalized)
}
}

/// New type representing whether metrics were extracted from transactions/spans.
#[derive(Debug, Copy, Clone)]
pub struct EventMetricsExtracted(pub bool);
Expand Down
157 changes: 4 additions & 153 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use relay_config::{Config, HttpEncoding, RelayMode};
use relay_dynamic_config::{Feature, GlobalConfig};
use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
use relay_event_schema::processor::ProcessingAction;
use relay_event_schema::protocol::{
ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
};
use relay_event_schema::protocol::{ClientReport, Event, EventId, NetworkReportError, SpanV2};
use relay_filter::FilterStatKey;
use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
use relay_protocol::Annotated;
Expand All @@ -37,9 +35,7 @@ use reqwest::header;
use smallvec::{SmallVec, smallvec};
use zstd::stream::Encoder as ZstdEncoder;

use crate::envelope::{
self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
};
use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType};
use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
use crate::integrations::Integration;
use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
Expand All @@ -57,10 +53,7 @@ use crate::processing::spans::SpansProcessor;
use crate::processing::trace_attachments::TraceAttachmentsProcessor;
use crate::processing::trace_metrics::TraceMetricsProcessor;
use crate::processing::transactions::TransactionProcessor;
use crate::processing::utils::event::{
EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
event_type,
};
use crate::processing::utils::event::event_category;
use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
use crate::service::ServiceError;
use crate::services::global_config::GlobalConfigHandle;
Expand Down Expand Up @@ -92,8 +85,6 @@ use {
symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
};

mod attachment;
mod dynamic_sampling;
pub mod event;
mod metrics;
mod nel;
Expand All @@ -104,11 +95,6 @@ mod span;
#[cfg(all(sentry, feature = "processing"))]
pub mod playstation;
mod standalone;
#[cfg(feature = "processing")]
mod unreal;

#[cfg(feature = "processing")]
mod nnswitch;

/// Creates the block only if used with `processing` feature.
///
Expand Down Expand Up @@ -174,16 +160,9 @@ macro_rules! processing_group {
};
}

/// A marker trait.
///
/// Should be used only with groups which are responsible for processing envelopes with events.
pub trait EventProcessing {}

processing_group!(TransactionGroup, Transaction);
impl EventProcessing for TransactionGroup {}

processing_group!(ErrorGroup, Error);
impl EventProcessing for ErrorGroup {}

processing_group!(SessionGroup, Session);
processing_group!(StandaloneGroup, Standalone);
Expand Down Expand Up @@ -1293,126 +1272,6 @@ impl EnvelopeProcessorService {
} else { Ok(cached_result.event) })
}

/// Processes the general errors, and the items which require or create the events.
async fn process_errors(
&self,
managed_envelope: &mut TypedEnvelope<ErrorGroup>,
project_id: ProjectId,
mut ctx: processing::Context<'_>,
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
let mut metrics = Metrics::default();
let mut extracted_metrics = ProcessingExtractedMetrics::new();

// Events can also contain user reports.
report::process_user_reports(managed_envelope);

if_processing!(self.inner.config, {
unreal::expand(managed_envelope, &self.inner.config)?;
#[cfg(sentry)]
playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
nnswitch::expand(managed_envelope)?;
});

let mut event = event::extract(
managed_envelope,
&mut metrics,
event_fully_normalized,
&self.inner.config,
)?;

if_processing!(self.inner.config, {
if let Some(inner_event_fully_normalized) =
unreal::process(managed_envelope, &mut event)?
{
event_fully_normalized = inner_event_fully_normalized;
}
#[cfg(sentry)]
if let Some(inner_event_fully_normalized) =
playstation::process(managed_envelope, &mut event, ctx.project_info)?
{
event_fully_normalized = inner_event_fully_normalized;
}
if let Some(inner_event_fully_normalized) =
attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
{
event_fully_normalized = inner_event_fully_normalized;
}
});

ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
managed_envelope,
&mut event,
ctx.project_info,
ctx.sampling_project_info,
);

let attachments = managed_envelope
.envelope()
.items()
.filter(|item| item.attachment_type() == Some(AttachmentType::Attachment));
processing::utils::event::finalize(
managed_envelope.envelope().headers(),
&mut event,
attachments,
&mut metrics,
ctx.config,
)?;
event_fully_normalized = processing::utils::event::normalize(
managed_envelope.envelope().headers(),
&mut event,
event_fully_normalized,
project_id,
ctx,
&self.inner.geoip_lookup,
)?;
let filter_run =
processing::utils::event::filter(managed_envelope.envelope().headers(), &event, ctx)
.map_err(ProcessingError::EventFiltered)?;

if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
dynamic_sampling::tag_error_with_sampling_decision(
managed_envelope,
&mut event,
ctx.sampling_project_info,
&self.inner.config,
)
.await;
}

event = self
.enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
.await?;

if event.value().is_some() {
processing::utils::event::scrub(&mut event, ctx.project_info)?;
event::serialize(
managed_envelope,
&mut event,
event_fully_normalized,
EventMetricsExtracted(false),
SpansExtracted(false),
)?;
event::emit_feedback_metrics(managed_envelope.envelope());
}

let attachments = managed_envelope
.envelope_mut()
.items_mut()
.filter(|i| i.ty() == &ItemType::Attachment);
processing::utils::attachments::scrub(attachments, ctx.project_info);

if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
relay_log::error!(
tags.project = %project_id,
tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
"ingested event without normalizing"
);
}

Ok(Some(extracted_metrics))
}

/// Processes standalone items that require an event ID, but do not have an event on the same envelope.
async fn process_standalone(
&self,
Expand Down Expand Up @@ -1634,16 +1493,8 @@ impl EnvelopeProcessorService {

match group {
ProcessingGroup::Error => {
if ctx.project_info.has_feature(Feature::NewErrorProcessing) {
self.process_with_processor(
&self.inner.processing.errors,
managed_envelope,
ctx,
)
self.process_with_processor(&self.inner.processing.errors, managed_envelope, ctx)
.await
} else {
run!(process_errors, project_id, ctx)
}
}
ProcessingGroup::Transaction => {
self.process_with_processor(
Expand Down
45 changes: 0 additions & 45 deletions relay-server/src/services/processor/attachment.rs

This file was deleted.

Loading
Loading