where

It's a pretty popular search command and it is used in all sorts of situations. Below are some really cool searches that use where along with other search commands.

Detect Machines with High Threatscore

index=<replace> | stats count by src_ip dst_ip dst_port protocol | lookup threatscore clientip as dst_ip | sort –threatscore | where threatscore>0

purpose:

Detect machines/applications who are potentially infected and have active running malware on it. Even use it to detect fraud for shopping site orders coming from bad IP's

requirements:

machine data with external IP's + IP Reputation App

comments:

  • Search Logs index=
  • Make sure fields are extracted fine – you can even let this run in realtime – looks cool: | stats count by src_ip dst_ip dst_port protocol
  • Now we enrich the data with | lookup threatscore clientip as dst_ip
  • Now as there is a new field evaluated (Threatscore) we want to show the IP's with the highest threatscore first by sorting it: | sort –threatscore
  • And now we only want to see malicious connections instead of the good once: | where threatscore>0

Unauthorized Foreign Activity

layout=edit | geoip clientip as clientip | table _time clientip client_country | where client_country NOT ("Germany" OR "Austria" OR "Switzerland")

purpose:

Detect unauthorized admin activity via foreign country

requirements:

Logs with external source IP's

comments:

  • Search for admin activity – like on my webpage in a CMS system for example for "layout=edit"
  • Display all the IP's with table clientip _time
  • Enrich them with geoip lookup (geoip clientip)
  • Display all changes with geo information:
    • layout=edit | lookup geoip clientip as clientip | table _time clientip client_country
  • Review them and create a simple whitelists | where client_country NOT ("Germany" OR "Austria" OR "Switzerland")

geo-location w/ user home base lookup

index=geod
# get some location information
| iplocation clientip
# lookup user details from a lookup table
#  including their home location
| lookup user_home_lu user as user
# calculate the distance between the login location
#  and the user's home location
#  using the haversine app (http://apps.splunk.com/app/936/)
| haversine originField=home_latlon units=mi inputFieldLat=lat inputFieldLon=lon
# limit the list to those where the distance is greater
#  than 500 miles
| where distance > 500
# clean up for reporting purposes
| strcat City ", " Region cs
# report the results
| fields user, cs, distance

purpose:

find users that are logging in from a location which is greater than 500 miles away from the registered home office

requirements:

haversine app clientip lookup table with user > home_latlon

comments:

json spath w/ date

... | spath input=message | where strptime('updated_at', "%Y-%m-%d %H:%M:%S %z") > strptime("2013-08-07 00:00:00", "%Y-%m-%d %H:%M:%S")

purpose:

searches for events which contain a field called "message". That field contains json payload and is expanded via a call to spath. Then a value from the resulting expansion is used to find events that contain a date meeting certain criteria.

requirements:

comments:

Speed / Distance Login Anomaly

index=geod
| iplocation clientip 
| sort _time 
| strcat lat "," lon latlon 
| streamstats current=f global=f window=1 last(latlon) as last_latlon
| eval last_latlon=if(isnull(last_latlon), latlon, last_latlon)
| streamstats current=f global=f window=1 last(_time) as last_ts
| eval time_since_last = _time - last_ts
| eval time_since_last=if(isnull(time_since_last), 0, time_since_last)
| haversine originField=last_latlon outputField=distance units=mi latlon
| eval speed=if(time_since_last==0, 0, (distance/(time_since_last/60/60)))
| where speed > 500
| strcat speed " MPH" speed
| table user, distance, _time, time_since_last, speed, _raw

purpose:

Find those tuples of events where the speed needed to cover distance in time between events is greater than 500MPH

requirements:

haversine app clientip

comments:

XML with spath

index=demo1 sourcetype=xml-log-data | spath input=message | where strptime('message.updated_at', "%Y-%m-%d %H:%M:%S %z") > strptime("2013-08-07 00:00:00", "%Y-%m-%d %H:%M:%S")

purpose:

searches for events which contain a field called "message" that composite field is expanded via a call to spath. Then a value from the resulting expansion is used to find events that contain a date meeting certain criteria.

requirements:

comments:

Authentication Anomalies via "area" algorithm

`authentication` 
| search ( action="success" ) 
| eval citycountry=src_city+", "+src_country 
| stats (name, purpose, rqts, searchstring, created_at, updated_at, commentary) VALUES(citycountry) as CityCountry(name, purpose, rqts, searchstring, created_at, updated_at, commentary) VALUES, dc(citycountry) as loccount, max(src_lat) as maxlat, min(src_lat) as minlat,max(src_long) as maxlong, min(src_long) as minlong by user 
| eval delta_lat = abs(maxlat-minlat) 
| eval delta_long=abs(maxlong-minlong) 
| eval area= delta_lat * delta_long * loccount 
| where area > 1000

purpose:

Use 'area' to identify whether a given person could travel the distance between login events.

requirements:

ES app (or something with a matching macro

comments:

More than a day between events

<search>
| sort _time
| streamstats current=f global=f window=1 last(_time) as last_ts
| eval time_since_last = _time - last_ts
| fieldformat time_since_last = tostring(time_since_last, "duration")
| where time_since_last > 60*60*24

purpose:

find situations where there is more than a day between two events

requirements:

any events. the only field dependency is _time

comments:

Simple Outlier Search

error |stats count by host| eventstats avg(count) as avg stdevp(count) as stdevp | eval ub=avg+2*stdevp, lb=avg-2*stdevp, is_outlier=if(count<lb, 1, if(count>ub, 1, 0)) | where is_outlier=1

purpose:

Find outliers - hosts that have an error count which is greater than two standard deviations away from the mean.

requirements:

hosts with errors. alternatively, you can alter the search (before pipe) to source just about anything else that you'd like to analyze.

comments:

Size distribution of my auto_high_volume buckets

| dbinspect [
  | rest /services/data/indexes      
  | eval index=title      
  | stats values(maxDataSize) as maxDataSize by index      
  | where maxDataSize="auto_high_volume"      
  | eval index="index=".index      
  | stats values(index) as indexes      
  | mvcombine delim=" " indexes     
  | eval search=indexes ] 
| bin sizeOnDiskMB span=2log4 
| chart limit=0 count by sizeOnDiskMB index

purpose:

requirements:

comments:

This search was developed to visualise if buckets were being rolled early.

detect time problem : (index time and time being really different)

sourcetype=*
| eval diff=_indextime-_time| eval indextime=strftime(_indextime,"%Y-%m-%d %H:%M:%S")
| table indextime _time diff sourcetype
| where lat!=0
| timechart avg(diff) max(diff) by sourcetype

purpose:

requirements:

comments:

this search simply graph the time difference between the event time and the indexing time by sourcetype : this helps troubleshoot sourcetypes time issues.