stats

It's a pretty popular search command and it is used in all sorts of situations. Below are some really cool searches that use stats along with other search commands.

Authentication Anomalies via "area" algorithm

`authentication` 
| search ( action="success" ) 
| eval citycountry=src_city+", "+src_country 
| stats (name, purpose, rqts, searchstring, created_at, updated_at, commentary) VALUES(citycountry) as CityCountry(name, purpose, rqts, searchstring, created_at, updated_at, commentary) VALUES, dc(citycountry) as loccount, max(src_lat) as maxlat, min(src_lat) as minlat,max(src_long) as maxlong, min(src_long) as minlong by user 
| eval delta_lat = abs(maxlat-minlat) 
| eval delta_long=abs(maxlong-minlong) 
| eval area= delta_lat * delta_long * loccount 
| where area > 1000

purpose:

Use 'area' to identify whether a given person could travel the distance between login events.

requirements:

ES app (or something with a matching macro

comments:

Detect Account Sharing

…. | stats dc(src_ip) as ip_count by user

purpose:

Detect Users who login from multiple IP's / User account Sharing

requirements:

Login logs with Username + Source IP field extractions

comments:

  • … - first search for something, maybe with logon/login etc. and review if there are the proper logs for logins and field extractions that are working
  • Do stats to show the distinct count of different source ip's used per user | stats dc(src_ip) as ip_count by user

Detect Machines with High Threatscore

index=<replace> | stats count by src_ip dst_ip dst_port protocol | lookup threatscore clientip as dst_ip | sort –threatscore | where threatscore>0

purpose:

Detect machines/applications who are potentially infected and have active running malware on it. Even use it to detect fraud for shopping site orders coming from bad IP's

requirements:

machine data with external IP's + IP Reputation App

comments:

  • Search Logs index=
  • Make sure fields are extracted fine – you can even let this run in realtime – looks cool: | stats count by src_ip dst_ip dst_port protocol
  • Now we enrich the data with | lookup threatscore clientip as dst_ip
  • Now as there is a new field evaluated (Threatscore) we want to show the IP's with the highest threatscore first by sorting it: | sort –threatscore
  • And now we only want to see malicious connections instead of the good once: | where threatscore>0

Simple Outlier Search

error |stats count by host| eventstats avg(count) as avg stdevp(count) as stdevp | eval ub=avg+2*stdevp, lb=avg-2*stdevp, is_outlier=if(count<lb, 1, if(count>ub, 1, 0)) | where is_outlier=1

purpose:

Find outliers - hosts that have an error count which is greater than two standard deviations away from the mean.

requirements:

hosts with errors. alternatively, you can alter the search (before pipe) to source just about anything else that you'd like to analyze.

comments:

song puzzle answer

index=music-puzzle sourcetype=test3 | rename song.parts{}.id as a__pid, song.parts{}.part as a__ppt, song.parts{}.seq as a__pseq | eval tuples = mvzip(mvzip(a__pid, a__ppt, "~~"),a__pseq, "~~") | fields - a__* | mvexpand tuples | rex field=tuples "(?<s_p_id>[^~]+)~~(?<s_p_text>[^~]+)~~(?<s_p_seq>[^~]+)" | sort song.name, s_p_seq | eval s_p_text = urldecode(s_p_text) | stats list(s_p_text) by song.name

purpose:

requirements:

comments:

Search Golf - Episode 1

# source the events in chron order (so "start" is before "end")
index=cst sourcetype=mav-golf | reverse 
# add a line number / temp id to the events
| eval lc=1 | accum lc 
# extract a field to make it easier to deal with action
#  not really necessary in this example - could just search for "start" / "end"
| rex field=_raw "ID=\S\s(?<action>\S+)\s" | stats list(action) as action by ID, lc 
# find action=start for each identifier and join that back into each row
| join ID type=left [search index=cst sourcetype=mav-golf | reverse | eval lc=1 | accum lc | rex field=_raw "ID=\S\s(?<action>\S+)\s"  | search action=start | stats first(lc) as open by ID] 
# find action=end for each identifier and join that back into each row
| join ID type=left [search index=cst sourcetype=mav-golf | reverse | eval lc=1 | accum lc | rex field=_raw "ID=\S\s(?<action>\S+)\s"  | search action=end | stats last(lc) as close by ID] 
# lastly, test each event to see if it's own id is between the start and end.
#  if so - count it.
| eval sc = if(lc>open, if(lc<close, 1, 0), 0) 
# And then sum up those events which should be counted.
| stats sum(sc) as num_events by ID

purpose:

Find the number of events within a sequence of events based on a shared identifier. Keywords ("start" and "end") mark the beginning and end of the sequence. The search cannot use the transaction command.

requirements:

Data like the following: 01/01/2014 01:01:00.003 ID=a start blah blah 01/01/2014 01:01:01.003 ID=d more blah blah 01/01/2014 01:01:02.003 ID=a end blah blah 01/01/2014 01:01:03.003 ID=b start blah blah 01/01/2014 01:01:04.003 ID=c start blah blah 01/01/2014 01:01:05.003 ID=y more blah blah 01/01/2014 01:01:05.006 ID=c more blah blah 01/01/2014 01:01:05.033 ID=c more blah blah 01/01/2014 01:01:06.003 ID=c end blah blah 01/01/2014 01:01:06.033 ID=b more blah blah 01/01/2014 01:01:07.003 ID=b end blah blah 01/01/2014 01:01:08.004 ID=c more blah blah 01/01/2014 01:01:09.005 ID=b more blah blah

comments:

Machines with Multiple Services

index=firewalltraffic | stats count by src_ip dst_ip dst_port protocol | stats dc(dst_port) as "Different Ports" by dst_ip

purpose:

Detect machines offering multiple services

requirements:

Firewall Traffic and extracted source/destination IP + SRC_Port/DST_Port

comments:

  • Search Firewall Logs index=
  • Make sure fields are extracted fine – you can even let this run in realtime – looks cool: stats count by src_ip dst_ip dst_port protocol
    • You might also use this one to trigger down to say – i can filter only on FTP Traffic (Port 21), SSH Traffic, Web, SMTP, Filter to show what active directory domain controllers are doing by SRC/DST IP etc.
  • Now we only want to see which IP's offering services on how many different ports: | stats dc(dst_port) as "Different Ports" by dst_ip
  • You can also switch by dst_ip with src_ip so you see which host is consuming the most different services
  • You can also filter it down with a additional | where "Different Ports" > 5

Search to end all errors

index=_internal sourcetype="splunkd" log_level="ERROR" 
| stats sparkline count dc(host) as hosts last(_raw) as last_raw_msg values(sourcetype) as sourcetype last(_time) as last_msg_time first(_time) as first_msg_time values(index) as index by punct 
| eval delta=round((first_msg_time-last_msg_time),2) 
| eval msg_per_sec=round((count/delta),2) 
| convert ctime(last_msg_time) ctime(first_msg_time) 
| table last_raw_msg count hosts sparkline msg_per_sec sourcetype index first_msg_time last_msg_time delta  | sort -count

purpose:

identifies frequently occurring errors in your splunk instance. LSS knocking out the top 10 on this list will make your splunk instance very happy

requirements:

comments:

Chart HTTP Status Category % by URL

index=* sourcetype=access* status=* | rex field=bc_uri "/(?<route>[^/]*)/" | rangemap field=status code_100=100-199 code_200=200-299 code_300=300-399 code_400=400-499 code_500=500-599 | rename range as stcat | stats count as sct by route, stcat |  eventstats sum(sct) as ttl by route | eval pct = round((sct/ttl), 2)."%" | xyseries route stcat pct

purpose:

creates a table where the rows are URL values, the columns are HTTP status categories and the cells are the percentage for that status / url combination

requirements:

comments:

Chart HTTP Status Category % by URL (using join)

index=* sourcetype=access* status=* | rex field=bc_uri "/(?<route>[^/]*)/" | stats count as scount by route, status_type | join route [search index=* sourcetype=access* status=* | rex field=bc_uri "/(?<route>[^/]*)/" | stats count as ttl by route] | eval pct = round((scount / ttl), 2)."%" | xyseries route status_type pct

purpose:

requirements:

comments:

cumulative distribution function

| stats count by X
| eventstats sum(count) as total 
| eval probXi=count/total
| sort X
| streamstats sum(probXi) as CDF

purpose:

requirements:

comments:

props to Pierre Brunel

Create a Normal Curve

| makeresults count=50000
| eval r = random() / (pow(2,31)-1)
| eval r2 = random() / (pow(2,31)-1)
| eval normal = sqrt(-2 * ln(r)) * cos(2 * pi() * r2)
| bin normal span=0.1
| stats count by normal
| makecontinuous normal

purpose:

requirements:

comments:

Props to Alexander (Xander) Johnson

Combine dbinspect and REST api data for buckets

| dbinspect index=*
| foreach * [eval dbinspect_<<FIELD>> = '<<FIELD>>']
| table dbinspect_*
| append [
  | rest splunk_server_group=dmc_group_cluster_master "/services/cluster/master/buckets"
  | foreach * [eval rest_api_<<FIELD>> = '<<FIELD>>']
  | table rest_api_* 
  ]
| eval bucketId=if(isNull(rest_api_title),dbinspect_bucketId,rest_api_title)
| stats values(*) as * by bucketId
| foreach rest_api_peers.*.* [eval rest_api_<<MATCHSEG2>>=""]
| foreach rest_api_peers.*.* [eval rest_api_<<MATCHSEG2>>=if("<<MATCHSEG1>>"=dbinspect_bucketId,'<<FIELD>>','<<MATCHSEG2>>')]
| fields - rest_api_peers.*

purpose:

requirements:

Needs to be executed on a search head that can query the cluster master REST API

comments:

The dbinspect API doesn't return consistent information about the size of buckets.

Work out data volumes by source type

| metadata type=sourcetypes
| noop sample_ratio=1000
| append [ search index=*  
   | eval size=length(_raw) 
   | stats avg(size) as average_event_size by sourcetype index
   ]
| stats values(totalCount) as total_events values(average_event_size) as average_event_size by sourcetype
| addinfo
| eval period_days=(info_max_time-info_min_time)/(24*60*60)
| eval totalMB_per_day=floor(total_events*average_event_size/period_days/1024/1024)
| table sourcetype totalMB_per_day

purpose:

Efficiently calculate how much data is being indexed per day by source type. Very useful for calculating enterprise security data volumes

requirements:

Requires 6.3.x or later for the event sampling feature

comments:

Combines results from | metadata for counts and then multiplies this by the average event size. Automatically accounts for time ranges. You will need to modify the sample rate to be suitable for your data volume. The metadata search turns out to be very approximate and counts the values associate with buckets, if you have buckets which are open for a very long time it will take the value for the entire period of the bucket, not the period of your search time range. Consider using tstats if this is an issue in your environment.

Indexes my user can search.

| rest /services/data/indexes
| search
    [
    | rest /services/data/indexes
    | dedup title
    | table title
    | search
        [
            | rest splunk_server=local /services/authorization/roles
            | search
                [
                        | rest splunk_server=local /services/authentication/users
                        | search
                            [
                                        | rest /services/authentication/current-context
                                        | search type=splunk
                                        | table username
                                        | rename username as title ]
| fields roles
| mvexpand roles
| rename roles as title] imported_srchIndexesAllowed=*
| table imported_srchIndexesAllowed
| rename imported_srchIndexesAllowed as title
| mvexpand title] ]
| stats values(splunk_server) as splunkserver by title
| eval splunkserver=mvjoin(splunkserver,":")
| lookup non_internal_indexes title as title OUTPUT description as description
| fields title, description, splunkserver
| rename title AS Index

purpose:

requirements:

lookup csv with columns "title" of Index and "description"

comments:

We use in a ~200 Users env to show the user in which Index he is allowed to search.

Size distribution of my auto_high_volume buckets

| dbinspect [
  | rest /services/data/indexes      
  | eval index=title      
  | stats values(maxDataSize) as maxDataSize by index      
  | where maxDataSize="auto_high_volume"      
  | eval index="index=".index      
  | stats values(index) as indexes      
  | mvcombine delim=" " indexes     
  | eval search=indexes ] 
| bin sizeOnDiskMB span=2log4 
| chart limit=0 count by sizeOnDiskMB index

purpose:

requirements:

comments:

This search was developed to visualise if buckets were being rolled early.

User Search Restrictions and Last Logins

| rest /services/authentication/users splunk_server=local
| table title realname email roles 
| mvexpand roles 
| join roles type=outer 
    [| rest /services/authorization/roles splunk_server=local
    | fields imported* title srch* 
    | fields - *Quota *TimeWin *capabilities 
    | rex mode=sed field=srchFilter "s/^\*$/true/" 
    | rex mode=sed field=imported_srchFilter "s/^\*$/true/" 
    | eval search_restrictions=if(imported_srchFilter="","( ".srchFilter." )",if(srchFilter="","( ".imported_srchFilter." )","( ".srchFilter." ) OR ( ".imported_srchFilter." ) ")) 
    | fields - srchIndexesDefault 
    | eval srchIndexesAllowed_new=mvjoin(srchIndexesAllowed," OR index=") 
    | eval index_restrictions="( index=".srchIndexesAllowed_new." )" 
    | table title search_restrictions index_restrictions 
    | rename title AS roles ] 
| stats values(roles) AS roles values(search_restrictions) AS search_restrictions values(index_restrictions) AS index_restrictions by title,realname,email 
| rex mode=sed field=search_restrictions "s/\( \)//g" 
| eval search_restrictions=mvjoin(search_restrictions," OR ") 
| eval index_restrictions=mvjoin(index_restrictions," OR ") 
| eval realname=upper(realname)
| rename title AS username
| eval restrictions=if(search_restrictions="( )",index_restrictions,search_restrictions." ".index_restrictions)
| table username email username roles restrictions
| search username=* 
| rex mode=sed field=restrictions "s/^ OR //g" 
| join type=outer username 
    [ search (index=_audit "login attempt info=succeeded) OR (index=_internal "GET /splunk/en-US/account/login") 
    | stats max(_time) AS "last_login" by user 
    | rename user AS username ] 
| eval days_ago=floor((now()-last_login)/86400) 
| eval last_login=strftime(last_login,"%Y-%m-%d %H:%M:%S") 
| fillnull last_login value="Never"

purpose:

requirements:

comments:

Shows a table showing the inherited search and index restrictions for each user, represented as an SPL string. In certain cases it may not be 100% accurate, but it covers most instances that I have run into. NOTE: does not account for `searchFilterSelecting = false` defined in authorize.conf

Show links between tags, sourcetypes, apps and datamodels.

| rest splunk_server=local count=0 /servicesNS/-/-/admin/datamodel-files 
| spath input=eai:data output=base_search path=objects{}.baseSearch 
| spath input=eai:data output=constraints path=objects{}.constraints{}.search 
| eval tag_content = mvappend(base_search,constraints) 
| rex max_match=0 field=tag_content "tag=\"?(?<tag_name>\w+)\"?" 
| mvexpand tag_name 
| rename title AS datamodel 
| append 
    [| rest splunk_server=local count=0 /servicesNS/-/-/admin/eventtypes 
    | rename eai:acl.app AS app tags AS tag_name 
    | search app="*TA*" 
    | rex max_match=0 field=search "sourcetype=\"?(?<sourcetype>[^\s\"^)]+)\"?" 
    | mvexpand sourcetype 
    | mvexpand tag_name 
    | eval app_sourcetype=mvzip(app,sourcetype,"__") 
    | stats list(tag_name) as tag_name by app, sourcetype,app_sourcetype ] 
| stats list(datamodel) as datamodel, list(app) as app, list(app_sourcetype) as app_sourcetype by tag_name 
| search datamodel=* 
| stats values(datamodel) as datamodel, values(tag_name) as tags by app_sourcetype 
| eval tags=mvdedup(tags) 
| rex max_match=0 field=app_sourcetype "\"?(?<app>.+)__\"?" 
| rex max_match=0 field=app_sourcetype "__\"?(?<sourcetype>.+)\"?" 
| fields - app_sourcetype

purpose:

This search answers the questions which dashboards will my new add-on be used for in Enterprise Security.

requirements:

comments:

This is useful to see which app populates which datamodel in Enterprise Security or any other environment which datamodels.