eval

It's a pretty popular search command and it is used in all sorts of situations. Below are some really cool searches that use eval along with other search commands.

Authentication Anomalies via "area" algorithm

`authentication` 
| search ( action="success" ) 
| eval citycountry=src_city+", "+src_country 
| stats (name, purpose, rqts, searchstring, created_at, updated_at, commentary) VALUES(citycountry) as CityCountry(name, purpose, rqts, searchstring, created_at, updated_at, commentary) VALUES, dc(citycountry) as loccount, max(src_lat) as maxlat, min(src_lat) as minlat,max(src_long) as maxlong, min(src_long) as minlong by user 
| eval delta_lat = abs(maxlat-minlat) 
| eval delta_long=abs(maxlong-minlong) 
| eval area= delta_lat * delta_long * loccount 
| where area > 1000

purpose:

Use 'area' to identify whether a given person could travel the distance between login events.

requirements:

ES app (or something with a matching macro

comments:

More than a day between events

<search>
| sort _time
| streamstats current=f global=f window=1 last(_time) as last_ts
| eval time_since_last = _time - last_ts
| fieldformat time_since_last = tostring(time_since_last, "duration")
| where time_since_last > 60*60*24

purpose:

find situations where there is more than a day between two events

requirements:

any events. the only field dependency is _time

comments:

Splunk Server's Time

* | head 1 | eval tnow = now() | fieldformat tnow=strftime(tnow, "%c %Z") | table tnow

purpose:

shows the time according to the splunk server

requirements:

comments:

Time between events

<search>
| sort _time 
| streamstats current=f global=f window=1 last(_time) as last_ts 
| eval time_since_last = _time - last_ts 
| fieldformat time_since_last = tostring(time_since_last, "duration")

purpose:

add a field to each event which is the time between this event and the previous one. duration between events

requirements:

any data. the only field requirement in this search is _time

comments:

Detect Clock Skew

| rest /services/server/info
| eval updated_t=round(strptime(updated, "%Y-%m-%dT%H:%M:%S%z"))
| eval delta_t=now()-updated_t
| eval delta=tostring(abs(delta_t), "duration")
| table serverName, updated, updated_t, delta, delta_t

purpose:

Check for server clock skew

requirements:

comments:

If delta is anything other than about 00:00:01 (which is easy to account for when processing a lot of indexers), you may have clock skew.

Simple Outlier Search

error |stats count by host| eventstats avg(count) as avg stdevp(count) as stdevp | eval ub=avg+2*stdevp, lb=avg-2*stdevp, is_outlier=if(count<lb, 1, if(count>ub, 1, 0)) | where is_outlier=1

purpose:

Find outliers - hosts that have an error count which is greater than two standard deviations away from the mean.

requirements:

hosts with errors. alternatively, you can alter the search (before pipe) to source just about anything else that you'd like to analyze.

comments:

Speed / Distance Login Anomaly

index=geod
| iplocation clientip 
| sort _time 
| strcat lat "," lon latlon 
| streamstats current=f global=f window=1 last(latlon) as last_latlon
| eval last_latlon=if(isnull(last_latlon), latlon, last_latlon)
| streamstats current=f global=f window=1 last(_time) as last_ts
| eval time_since_last = _time - last_ts
| eval time_since_last=if(isnull(time_since_last), 0, time_since_last)
| haversine originField=last_latlon outputField=distance units=mi latlon
| eval speed=if(time_since_last==0, 0, (distance/(time_since_last/60/60)))
| where speed > 500
| strcat speed " MPH" speed
| table user, distance, _time, time_since_last, speed, _raw

purpose:

Find those tuples of events where the speed needed to cover distance in time between events is greater than 500MPH

requirements:

haversine app clientip

comments:

Auth anomaly basic with haversine

index=geod 
| iplocation clientip 
| sort _time 
| strcat lat "," lon latlon 
| streamstats current=f global=f window=1 last(latlon) as last_latlon
| eval last_latlon=if(isnull(last_latlon), latlon, last_latlon)
| streamstats current=f global=f window=1 last(_time) as last_ts
| eval time_since_last = _time - last_ts
| eval time_since_last=if(isnull(time_since_last), 0, time_since_last)
| haversine originField=last_latlon outputField=distance units=mi latlon
| eval speed=if(time_since_last==0, 0, (distance/(time_since_last/60/60)))
| strcat speed " MPH" speed
| table user, distance, _time, time_since_last, speed, _raw

purpose:

Find the speed needed to cover the distance between the ip-location specified in two different login events

requirements:

haversine app clientip as ip address

comments:

Extract SQL Insert Params

sourcetype=stream:mysql* query="insert into*" | rex "insert into \S* \((?<aaa>[^)]+)\) values \((?<bbb>[^)]+)\)" | rex mode=sed field=bbb "s/\\\\\"//g" | makemv aaa delim="," | makemv bbb delim="," | eval a_kvfield = mvzip(aaa, bbb) | extract jam_kv_extract | timechart span=1s per_second(m_value) by m_name

purpose:

extracts fields from a SQL Insert statement so that the values inserted into the database can be manipulated via splunk searches. In this case, it is used in conjunction with splunk stream & mysql, but should work with any source / database technology.

requirements:

comments:

song puzzle answer

index=music-puzzle sourcetype=test3 | rename song.parts{}.id as a__pid, song.parts{}.part as a__ppt, song.parts{}.seq as a__pseq | eval tuples = mvzip(mvzip(a__pid, a__ppt, "~~"),a__pseq, "~~") | fields - a__* | mvexpand tuples | rex field=tuples "(?<s_p_id>[^~]+)~~(?<s_p_text>[^~]+)~~(?<s_p_seq>[^~]+)" | sort song.name, s_p_seq | eval s_p_text = urldecode(s_p_text) | stats list(s_p_text) by song.name

purpose:

requirements:

comments:

Search Golf - Episode 1

# source the events in chron order (so "start" is before "end")
index=cst sourcetype=mav-golf | reverse 
# add a line number / temp id to the events
| eval lc=1 | accum lc 
# extract a field to make it easier to deal with action
#  not really necessary in this example - could just search for "start" / "end"
| rex field=_raw "ID=\S\s(?<action>\S+)\s" | stats list(action) as action by ID, lc 
# find action=start for each identifier and join that back into each row
| join ID type=left [search index=cst sourcetype=mav-golf | reverse | eval lc=1 | accum lc | rex field=_raw "ID=\S\s(?<action>\S+)\s"  | search action=start | stats first(lc) as open by ID] 
# find action=end for each identifier and join that back into each row
| join ID type=left [search index=cst sourcetype=mav-golf | reverse | eval lc=1 | accum lc | rex field=_raw "ID=\S\s(?<action>\S+)\s"  | search action=end | stats last(lc) as close by ID] 
# lastly, test each event to see if it's own id is between the start and end.
#  if so - count it.
| eval sc = if(lc>open, if(lc<close, 1, 0), 0) 
# And then sum up those events which should be counted.
| stats sum(sc) as num_events by ID

purpose:

Find the number of events within a sequence of events based on a shared identifier. Keywords ("start" and "end") mark the beginning and end of the sequence. The search cannot use the transaction command.

requirements:

Data like the following: 01/01/2014 01:01:00.003 ID=a start blah blah 01/01/2014 01:01:01.003 ID=d more blah blah 01/01/2014 01:01:02.003 ID=a end blah blah 01/01/2014 01:01:03.003 ID=b start blah blah 01/01/2014 01:01:04.003 ID=c start blah blah 01/01/2014 01:01:05.003 ID=y more blah blah 01/01/2014 01:01:05.006 ID=c more blah blah 01/01/2014 01:01:05.033 ID=c more blah blah 01/01/2014 01:01:06.003 ID=c end blah blah 01/01/2014 01:01:06.033 ID=b more blah blah 01/01/2014 01:01:07.003 ID=b end blah blah 01/01/2014 01:01:08.004 ID=c more blah blah 01/01/2014 01:01:09.005 ID=b more blah blah

comments:

json mv extraction

...
# clean up some field names for ease of typing later
| rename events{}.code AS a_c, events{}.message AS a_m, events{}.timestamp AS a_ts, events{}.priority as a_p
# combine mv fields together using mvzip (to get tuples as comma-delim'd strings)
| eval b_combined = mvzip(mvzip(mvzip(a_c, a_m), a_ts), a_p)
# get rid of the a_* fields, simply b/c we don't need them clogging up the ui
| fields - a_*
# expand out the combined fields
| mvexpand b_combined
# extract nicely named fields from the results (using the comma from mvzip as the delimiter)
| rex field=b_combined "(?<e_c>[^,]*),(?<e_m>[^,]*),(?<e_ts>[^,]*),(?<e_p>[^,]*)"
# get rid of the combined field b/c we don't need it
| fields - b_*
# urldecode the field that you care about
| eval e_m = urldecode(e_m)

purpose:

requirements:

some json data with pretty specific structure

comments:

Search to end all errors

index=_internal sourcetype="splunkd" log_level="ERROR" 
| stats sparkline count dc(host) as hosts last(_raw) as last_raw_msg values(sourcetype) as sourcetype last(_time) as last_msg_time first(_time) as first_msg_time values(index) as index by punct 
| eval delta=round((first_msg_time-last_msg_time),2) 
| eval msg_per_sec=round((count/delta),2) 
| convert ctime(last_msg_time) ctime(first_msg_time) 
| table last_raw_msg count hosts sparkline msg_per_sec sourcetype index first_msg_time last_msg_time delta  | sort -count

purpose:

identifies frequently occurring errors in your splunk instance. LSS knocking out the top 10 on this list will make your splunk instance very happy

requirements:

comments:

Chart HTTP Status Category % by URL

index=* sourcetype=access* status=* | rex field=bc_uri "/(?<route>[^/]*)/" | rangemap field=status code_100=100-199 code_200=200-299 code_300=300-399 code_400=400-499 code_500=500-599 | rename range as stcat | stats count as sct by route, stcat |  eventstats sum(sct) as ttl by route | eval pct = round((sct/ttl), 2)."%" | xyseries route stcat pct

purpose:

creates a table where the rows are URL values, the columns are HTTP status categories and the cells are the percentage for that status / url combination

requirements:

comments:

Chart HTTP Status Category % by URL (using join)

index=* sourcetype=access* status=* | rex field=bc_uri "/(?<route>[^/]*)/" | stats count as scount by route, status_type | join route [search index=* sourcetype=access* status=* | rex field=bc_uri "/(?<route>[^/]*)/" | stats count as ttl by route] | eval pct = round((scount / ttl), 2)."%" | xyseries route status_type pct

purpose:

requirements:

comments:

cumulative distribution function

| stats count by X
| eventstats sum(count) as totalĀ 
| eval probXi=count/total
| sort X
| streamstats sum(probXi) as CDF

purpose:

requirements:

comments:

props to Pierre Brunel

Create a Normal Curve

| makeresults count=50000
| eval r = random() / (pow(2,31)-1)
| eval r2 = random() / (pow(2,31)-1)
| eval normal = sqrt(-2 * ln(r)) * cos(2 * pi() * r2)
| bin normal span=0.1
| stats count by normal
| makecontinuous normal

purpose:

requirements:

comments:

Props to Alexander (Xander) Johnson

Combine dbinspect and REST api data for buckets

| dbinspect index=*
| foreach * [eval dbinspect_<<FIELD>> = '<<FIELD>>']
| table dbinspect_*
| append [
  | rest splunk_server_group=dmc_group_cluster_master "/services/cluster/master/buckets"
  | foreach * [eval rest_api_<<FIELD>> = '<<FIELD>>']
  | table rest_api_* 
  ]
| eval bucketId=if(isNull(rest_api_title),dbinspect_bucketId,rest_api_title)
| stats values(*) as * by bucketId
| foreach rest_api_peers.*.* [eval rest_api_<<MATCHSEG2>>=""]
| foreach rest_api_peers.*.* [eval rest_api_<<MATCHSEG2>>=if("<<MATCHSEG1>>"=dbinspect_bucketId,'<<FIELD>>','<<MATCHSEG2>>')]
| fields - rest_api_peers.*

purpose:

requirements:

Needs to be executed on a search head that can query the cluster master REST API

comments:

The dbinspect API doesn't return consistent information about the size of buckets.

Work out data volumes by source type

| metadata type=sourcetypes
| noop sample_ratio=1000
| append [ search index=*  
   | eval size=length(_raw) 
   | stats avg(size) as average_event_size by sourcetype index
   ]
| stats values(totalCount) as total_events values(average_event_size) as average_event_size by sourcetype
| addinfo
| eval period_days=(info_max_time-info_min_time)/(24*60*60)
| eval totalMB_per_day=floor(total_events*average_event_size/period_days/1024/1024)
| table sourcetype totalMB_per_day

purpose:

Efficiently calculate how much data is being indexed per day by source type. Very useful for calculating enterprise security data volumes

requirements:

Requires 6.3.x or later for the event sampling feature

comments:

Combines results from | metadata for counts and then multiplies this by the average event size. Automatically accounts for time ranges. You will need to modify the sample rate to be suitable for your data volume. The metadata search turns out to be very approximate and counts the values associate with buckets, if you have buckets which are open for a very long time it will take the value for the entire period of the bucket, not the period of your search time range. Consider using tstats if this is an issue in your environment.

Indexes my user can search.

| rest /services/data/indexes
| search
    [
    | rest /services/data/indexes
    | dedup title
    | table title
    | search
        [
            | rest splunk_server=local /services/authorization/roles
            | search
                [
                        | rest splunk_server=local /services/authentication/users
                        | search
                            [
                                        | rest /services/authentication/current-context
                                        | search type=splunk
                                        | table username
                                        | rename username as title ]
| fields roles
| mvexpand roles
| rename roles as title] imported_srchIndexesAllowed=*
| table imported_srchIndexesAllowed
| rename imported_srchIndexesAllowed as title
| mvexpand title] ]
| stats values(splunk_server) as splunkserver by title
| eval splunkserver=mvjoin(splunkserver,":")
| lookup non_internal_indexes title as title OUTPUT description as description
| fields title, description, splunkserver
| rename title AS Index

purpose:

requirements:

lookup csv with columns "title" of Index and "description"

comments:

We use in a ~200 Users env to show the user in which Index he is allowed to search.

Pearson Correlation Coefficient

index=*
| fields x y
| eval n=1 | eval xx=x*x | eval xy=x*y | eval yy=y*y
| addcoltotals  | tail 1
| eval rho_xy=(xy/n-x/n*y/n)/(sqrt(xx/n-(x/n)*(x/n))*sqrt(yy/n-(y/n)*(y/n)))
| fields rho_xy

purpose:

requirements:

comments:

This SPL query calculates the Pearson coefficient of two fields named x and y.

How many days in this month?

 | makeresults 
 | eval days_in_month=mvindex(split(if(tonumber(strftime(_time,"%y"))%4=0,"31,29,31,30,31,30,31,31,30,31,30,31","31,28,31,30,31,30,31,31,30,31,30,31"),","),tonumber(strftime(_time,"%m"))-1)

purpose:

Given _time how many days are in this month, 31? 30? 28? 29? This eval statement uses a lookup held in a multi-value array to pull out the value in a computationally efficient manor.

requirements:

comments:

The eval expression is the solution, and the example only uses makeresults to fake create sample data

Size distribution of my auto_high_volume buckets

| dbinspect [
  | rest /services/data/indexes      
  | eval index=title      
  | stats values(maxDataSize) as maxDataSize by index      
  | where maxDataSize="auto_high_volume"      
  | eval index="index=".index      
  | stats values(index) as indexes      
  | mvcombine delim=" " indexes     
  | eval search=indexes ] 
| bin sizeOnDiskMB span=2log4 
| chart limit=0 count by sizeOnDiskMB index

purpose:

requirements:

comments:

This search was developed to visualise if buckets were being rolled early.

User Search Restrictions and Last Logins

| rest /services/authentication/users splunk_server=local
| table title realname email roles 
| mvexpand roles 
| join roles type=outer 
    [| rest /services/authorization/roles splunk_server=local
    | fields imported* title srch* 
    | fields - *Quota *TimeWin *capabilities 
    | rex mode=sed field=srchFilter "s/^\*$/true/" 
    | rex mode=sed field=imported_srchFilter "s/^\*$/true/" 
    | eval search_restrictions=if(imported_srchFilter="","( ".srchFilter." )",if(srchFilter="","( ".imported_srchFilter." )","( ".srchFilter." ) OR ( ".imported_srchFilter." ) ")) 
    | fields - srchIndexesDefault 
    | eval srchIndexesAllowed_new=mvjoin(srchIndexesAllowed," OR index=") 
    | eval index_restrictions="( index=".srchIndexesAllowed_new." )" 
    | table title search_restrictions index_restrictions 
    | rename title AS roles ] 
| stats values(roles) AS roles values(search_restrictions) AS search_restrictions values(index_restrictions) AS index_restrictions by title,realname,email 
| rex mode=sed field=search_restrictions "s/\( \)//g" 
| eval search_restrictions=mvjoin(search_restrictions," OR ") 
| eval index_restrictions=mvjoin(index_restrictions," OR ") 
| eval realname=upper(realname)
| rename title AS username
| eval restrictions=if(search_restrictions="( )",index_restrictions,search_restrictions." ".index_restrictions)
| table username email username roles restrictions
| search username=* 
| rex mode=sed field=restrictions "s/^ OR //g" 
| join type=outer username 
    [ search (index=_audit "login attempt info=succeeded) OR (index=_internal "GET /splunk/en-US/account/login") 
    | stats max(_time) AS "last_login" by user 
    | rename user AS username ] 
| eval days_ago=floor((now()-last_login)/86400) 
| eval last_login=strftime(last_login,"%Y-%m-%d %H:%M:%S") 
| fillnull last_login value="Never"

purpose:

requirements:

comments:

Shows a table showing the inherited search and index restrictions for each user, represented as an SPL string. In certain cases it may not be 100% accurate, but it covers most instances that I have run into. NOTE: does not account for `searchFilterSelecting = false` defined in authorize.conf

Show links between tags, sourcetypes, apps and datamodels.

| rest splunk_server=local count=0 /servicesNS/-/-/admin/datamodel-files 
| spath input=eai:data output=base_search path=objects{}.baseSearch 
| spath input=eai:data output=constraints path=objects{}.constraints{}.search 
| eval tag_content = mvappend(base_search,constraints) 
| rex max_match=0 field=tag_content "tag=\"?(?<tag_name>\w+)\"?" 
| mvexpand tag_name 
| rename title AS datamodel 
| append 
    [| rest splunk_server=local count=0 /servicesNS/-/-/admin/eventtypes 
    | rename eai:acl.app AS app tags AS tag_name 
    | search app="*TA*" 
    | rex max_match=0 field=search "sourcetype=\"?(?<sourcetype>[^\s\"^)]+)\"?" 
    | mvexpand sourcetype 
    | mvexpand tag_name 
    | eval app_sourcetype=mvzip(app,sourcetype,"__") 
    | stats list(tag_name) as tag_name by app, sourcetype,app_sourcetype ] 
| stats list(datamodel) as datamodel, list(app) as app, list(app_sourcetype) as app_sourcetype by tag_name 
| search datamodel=* 
| stats values(datamodel) as datamodel, values(tag_name) as tags by app_sourcetype 
| eval tags=mvdedup(tags) 
| rex max_match=0 field=app_sourcetype "\"?(?<app>.+)__\"?" 
| rex max_match=0 field=app_sourcetype "__\"?(?<sourcetype>.+)\"?" 
| fields - app_sourcetype

purpose:

This search answers the questions which dashboards will my new add-on be used for in Enterprise Security.

requirements:

comments:

This is useful to see which app populates which datamodel in Enterprise Security or any other environment which datamodels.

calculate duration without week end

| makeresults count=5 
| eval aging=random()%25 
| eval end=_time 
| eval start=end-(aging*86400) 
| eval range=mvrange(start, end, 86400) 
| convert ctime(range) timeformat="%+" 
| eval BusinessDays=mvcount(mvfilter(NOT match(range,"(Sun|Sat).*")))

purpose:

calculate a duration excluding week end days

requirements:

comments:

if you have a start and end time : calculate the duration in business days, excluding week end. using mvrange.

detect time problem : (index time and time being really different)

sourcetype=*
| eval diff=_indextime-_time| eval indextime=strftime(_indextime,"%Y-%m-%d %H:%M:%S")
| table indextime _time diff sourcetype
| where lat!=0
| timechart avg(diff) max(diff) by sourcetype

purpose:

requirements:

comments:

this search simply graph the time difference between the event time and the indexing time by sourcetype : this helps troubleshoot sourcetypes time issues.