@@ -12,7 +12,7 @@ import {
1212} from '@grafana/data' ;
1313
1414import { SearchQuery , ClpDataSourceOptions , DEFAULT_QUERY } from './types' ;
15- import { Observable , forkJoin , lastValueFrom } from 'rxjs' ;
15+ import { Observable , forkJoin , zip , lastValueFrom } from 'rxjs' ;
1616import { map , switchMap , reduce } from 'rxjs/operators' ;
1717import { createParser , type EventSourceMessage , type ParseError } from 'eventsource-parser' ;
1818
@@ -99,8 +99,26 @@ export class DataSource extends DataSourceApi<SearchQuery, ClpDataSourceOptions>
9999 ) ;
100100 }
101101
102+ #fetchTimestampColumnNames( dataset : string ) : Observable < string [ ] > {
103+ return getBackendSrv ( )
104+ . fetch < string [ ] > ( {
105+ url : `${ this . baseUrl } /column_metadata/${ dataset } /timestamp` ,
106+ method : 'GET' ,
107+ } )
108+ . pipe ( map ( ( response ) => response . data ) ) ;
109+ }
110+
111+ #extractField( message : unknown , columnName : string ) : unknown {
112+ const fieldPath = columnName . split ( / (?< ! \\ ) \. / ) . map ( ( s ) => s . replace ( / \\ \. / g, '.' ) ) ;
113+ let current = message ;
114+ for ( const segment of fieldPath ) {
115+ current = ( current as Record < string , unknown > ) [ segment ] ;
116+ }
117+ return current ;
118+ }
119+
102120 query ( options : DataQueryRequest < SearchQuery > ) : Observable < DataQueryResponse > {
103- const observables = options . targets . map ( ( target ) =>
121+ const queryResultsObservables = options . targets . map ( ( target ) =>
104122 this . #submitQuery( target , options . range ) . pipe (
105123 switchMap ( ( uri ) => {
106124 const searchJobId = uri . split ( '/' ) . pop ( ) ! ;
@@ -126,25 +144,48 @@ export class DataSource extends DataSourceApi<SearchQuery, ClpDataSourceOptions>
126144 }
127145 } ;
128146 } ) ;
129- } ) ,
130- map ( ( dataBuffer ) => ( { target, dataBuffer } ) )
147+ } )
131148 )
132149 ) ;
133150
134- return forkJoin ( observables ) . pipe (
135- map ( ( results ) => ( {
136- data : results . map ( ( { target, dataBuffer } ) => {
151+ const timestampColumnNamesObservables = options . targets . map ( ( target ) =>
152+ this . #fetchTimestampColumnNames( target . dataset ?? 'default' )
153+ ) ;
154+
155+ const dataframeObservables = options . targets . map ( ( target , i ) =>
156+ zip ( timestampColumnNamesObservables [ i ] , queryResultsObservables [ i ] ) . pipe (
157+ map ( ( [ timestampColumnNames , dataBuffer ] ) => {
158+ const fields = [ ] ;
159+
137160 const values = target . maxNumResults ? dataBuffer . slice ( 0 , target . maxNumResults ) : dataBuffer ;
161+ fields . push ( { name : 'body' , values, type : FieldType . string } ) ;
162+
163+ const [ timestampColumnName ] = timestampColumnNames ;
164+ if ( 'undefined' !== typeof timestampColumnName ) {
165+ const parsedValues = values . map ( ( line ) => JSON . parse ( line ) ) ;
166+ const timestamps = parsedValues . map ( ( parsedValue ) => {
167+ try {
168+ return this . #extractField( parsedValue , timestampColumnName ) ;
169+ } catch ( err : unknown ) {
170+ return null ;
171+ }
172+ } ) ;
173+ fields . push ( { name : 'timestamp' , values : timestamps , type : FieldType . time } ) ;
174+ }
175+
138176 return createDataFrame ( {
139177 refId : target . refId ,
140- fields : [ { name : target . refId , values , type : FieldType . string } ] ,
178+ fields : fields ,
141179 meta : {
142180 type : DataFrameType . LogLines ,
181+ preferredVisualisationType : 'logs' ,
143182 } ,
144183 } ) ;
145- } ) ,
146- } ) )
184+ } )
185+ )
147186 ) ;
187+
188+ return forkJoin ( dataframeObservables ) . pipe ( map ( ( data ) => ( { data } ) ) ) ;
148189 }
149190
150191 async testDatasource ( ) {
0 commit comments