D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
usr
/
share
/
grafana
/
public
/
app
/
plugins
/
datasource
/
tempo
/
Filename :
streaming.ts
back
Copy
import { capitalize } from 'lodash'; import { map, Observable, defer, mergeMap } from 'rxjs'; import { v4 as uuidv4 } from 'uuid'; import { DataFrame, DataQueryRequest, DataQueryResponse, DataSourceInstanceSettings, FieldType, LiveChannelScope, LoadingState, MutableDataFrame, ThresholdsConfig, ThresholdsMode, } from '@grafana/data'; import { getGrafanaLiveSrv } from '@grafana/runtime'; import { SearchStreamingState } from './dataquery.gen'; import { TempoDatasource } from './datasource'; import { createTableFrameFromTraceQlQuery } from './resultTransformer'; import { SearchMetrics, TempoJsonData, TempoQuery } from './types'; export async function getLiveStreamKey(): Promise<string> { return uuidv4(); } export function doTempoChannelStream( query: TempoQuery, ds: TempoDatasource, options: DataQueryRequest<TempoQuery>, instanceSettings: DataSourceInstanceSettings<TempoJsonData> ): Observable<DataQueryResponse> { const range = options.range; let frames: DataFrame[] | undefined = undefined; let state: LoadingState = LoadingState.NotStarted; return defer(() => getLiveStreamKey()).pipe( mergeMap((key) => { const requestTime = performance.now(); return getGrafanaLiveSrv() .getStream<MutableDataFrame>({ scope: LiveChannelScope.DataSource, namespace: ds.uid, path: `search/${key}`, data: { ...query, timeRange: { from: range.from.toISOString(), to: range.to.toISOString(), }, }, }) .pipe( map((evt) => { if ('message' in evt && evt?.message) { const currentTime = performance.now(); const elapsedTime = currentTime - requestTime; // Schema should be [traces, metrics, state, error] const traces = evt.message.data.values[0][0]; const metrics = evt.message.data.values[1][0]; const frameState: SearchStreamingState = evt.message.data.values[2][0]; const error = evt.message.data.values[3][0]; switch (frameState) { case SearchStreamingState.Done: state = LoadingState.Done; break; case SearchStreamingState.Streaming: state = LoadingState.Streaming; break; case SearchStreamingState.Error: throw new Error(error); } frames = [ metricsDataFrame(metrics, frameState, elapsedTime), ...createTableFrameFromTraceQlQuery(traces, instanceSettings), ]; } return { data: frames || [], state, }; }) ); }) ); } function metricsDataFrame(metrics: SearchMetrics, state: SearchStreamingState, elapsedTime: number) { const progressThresholds: ThresholdsConfig = { steps: [ { color: 'blue', value: -Infinity, }, { color: 'green', value: 75, }, ], mode: ThresholdsMode.Absolute, }; const frame: DataFrame = { refId: 'streaming-progress', name: 'Streaming Progress', length: 1, fields: [ { name: 'state', type: FieldType.string, values: [capitalize(state.toString())], config: { displayNameFromDS: 'State', }, }, { name: 'elapsedTime', type: FieldType.number, values: [elapsedTime], config: { unit: 'ms', displayNameFromDS: 'Elapsed Time', }, }, { name: 'totalBlocks', type: FieldType.number, values: [metrics.totalBlocks], config: { displayNameFromDS: 'Total Blocks', }, }, { name: 'completedJobs', type: FieldType.number, values: [metrics.completedJobs], config: { displayNameFromDS: 'Completed Jobs', }, }, { name: 'totalJobs', type: FieldType.number, values: [metrics.totalJobs], config: { displayNameFromDS: 'Total Jobs', }, }, { name: 'progress', type: FieldType.number, values: [ state === SearchStreamingState.Done ? 100 : ((metrics.completedJobs || 0) / (metrics.totalJobs || 1)) * 100, ], config: { displayNameFromDS: 'Progress', unit: 'percent', min: 0, max: 100, custom: { cellOptions: { type: 'gauge', mode: 'gradient', }, }, thresholds: progressThresholds, }, }, ], meta: { preferredVisualisationType: 'table', }, }; return frame; }