sort

It's a pretty popular search command and it is used in all sorts of situations. Below are some really cool searches that use sort along with other search commands.

More than a day between events

<search>
| sort _time
| streamstats current=f global=f window=1 last(_time) as last_ts
| eval time_since_last = _time - last_ts
| fieldformat time_since_last = tostring(time_since_last, "duration")
| where time_since_last > 60*60*24

purpose:

find situations where there is more than a day between two events

requirements:

any events. the only field dependency is _time

comments:

Time between events

<search>
| sort _time 
| streamstats current=f global=f window=1 last(_time) as last_ts 
| eval time_since_last = _time - last_ts 
| fieldformat time_since_last = tostring(time_since_last, "duration")

purpose:

add a field to each event which is the time between this event and the previous one. duration between events

requirements:

any data. the only field requirement in this search is _time

comments:

Detect Machines with High Threatscore

index=<replace> | stats count by src_ip dst_ip dst_port protocol | lookup threatscore clientip as dst_ip | sort –threatscore | where threatscore>0

purpose:

Detect machines/applications who are potentially infected and have active running malware on it. Even use it to detect fraud for shopping site orders coming from bad IP's

requirements:

machine data with external IP's + IP Reputation App

comments:

  • Search Logs index=
  • Make sure fields are extracted fine – you can even let this run in realtime – looks cool: | stats count by src_ip dst_ip dst_port protocol
  • Now we enrich the data with | lookup threatscore clientip as dst_ip
  • Now as there is a new field evaluated (Threatscore) we want to show the IP's with the highest threatscore first by sorting it: | sort –threatscore
  • And now we only want to see malicious connections instead of the good once: | where threatscore>0

Speed / Distance Login Anomaly

index=geod
| iplocation clientip 
| sort _time 
| strcat lat "," lon latlon 
| streamstats current=f global=f window=1 last(latlon) as last_latlon
| eval last_latlon=if(isnull(last_latlon), latlon, last_latlon)
| streamstats current=f global=f window=1 last(_time) as last_ts
| eval time_since_last = _time - last_ts
| eval time_since_last=if(isnull(time_since_last), 0, time_since_last)
| haversine originField=last_latlon outputField=distance units=mi latlon
| eval speed=if(time_since_last==0, 0, (distance/(time_since_last/60/60)))
| where speed > 500
| strcat speed " MPH" speed
| table user, distance, _time, time_since_last, speed, _raw

purpose:

Find those tuples of events where the speed needed to cover distance in time between events is greater than 500MPH

requirements:

haversine app clientip

comments:

Auth anomaly basic with haversine

index=geod 
| iplocation clientip 
| sort _time 
| strcat lat "," lon latlon 
| streamstats current=f global=f window=1 last(latlon) as last_latlon
| eval last_latlon=if(isnull(last_latlon), latlon, last_latlon)
| streamstats current=f global=f window=1 last(_time) as last_ts
| eval time_since_last = _time - last_ts
| eval time_since_last=if(isnull(time_since_last), 0, time_since_last)
| haversine originField=last_latlon outputField=distance units=mi latlon
| eval speed=if(time_since_last==0, 0, (distance/(time_since_last/60/60)))
| strcat speed " MPH" speed
| table user, distance, _time, time_since_last, speed, _raw

purpose:

Find the speed needed to cover the distance between the ip-location specified in two different login events

requirements:

haversine app clientip as ip address

comments:

song puzzle answer

index=music-puzzle sourcetype=test3 | rename song.parts{}.id as a__pid, song.parts{}.part as a__ppt, song.parts{}.seq as a__pseq | eval tuples = mvzip(mvzip(a__pid, a__ppt, "~~"),a__pseq, "~~") | fields - a__* | mvexpand tuples | rex field=tuples "(?<s_p_id>[^~]+)~~(?<s_p_text>[^~]+)~~(?<s_p_seq>[^~]+)" | sort song.name, s_p_seq | eval s_p_text = urldecode(s_p_text) | stats list(s_p_text) by song.name

purpose:

requirements:

comments:

Search to end all errors

index=_internal sourcetype="splunkd" log_level="ERROR" 
| stats sparkline count dc(host) as hosts last(_raw) as last_raw_msg values(sourcetype) as sourcetype last(_time) as last_msg_time first(_time) as first_msg_time values(index) as index by punct 
| eval delta=round((first_msg_time-last_msg_time),2) 
| eval msg_per_sec=round((count/delta),2) 
| convert ctime(last_msg_time) ctime(first_msg_time) 
| table last_raw_msg count hosts sparkline msg_per_sec sourcetype index first_msg_time last_msg_time delta  | sort -count

purpose:

identifies frequently occurring errors in your splunk instance. LSS knocking out the top 10 on this list will make your splunk instance very happy

requirements:

comments:

cumulative distribution function

| stats count by X
| eventstats sum(count) as total 
| eval probXi=count/total
| sort X
| streamstats sum(probXi) as CDF

purpose:

requirements:

comments:

props to Pierre Brunel