table

It's a pretty popular search command and it is used in all sorts of situations. Below are some really cool searches that use table along with other search commands.

Splunk Server's Time

* | head 1 | eval tnow = now() | fieldformat tnow=strftime(tnow, "%c %Z") | table tnow

purpose:

shows the time according to the splunk server

requirements:

comments:

Detect Clock Skew

| rest /services/server/info
| eval updated_t=round(strptime(updated, "%Y-%m-%dT%H:%M:%S%z"))
| eval delta_t=now()-updated_t
| eval delta=tostring(abs(delta_t), "duration")
| table serverName, updated, updated_t, delta, delta_t

purpose:

Check for server clock skew

requirements:

comments:

If delta is anything other than about 00:00:01 (which is easy to account for when processing a lot of indexers), you may have clock skew.

Search to end all errors

index=_internal sourcetype="splunkd" log_level="ERROR" 
| stats sparkline count dc(host) as hosts last(_raw) as last_raw_msg values(sourcetype) as sourcetype last(_time) as last_msg_time first(_time) as first_msg_time values(index) as index by punct 
| eval delta=round((first_msg_time-last_msg_time),2) 
| eval msg_per_sec=round((count/delta),2) 
| convert ctime(last_msg_time) ctime(first_msg_time) 
| table last_raw_msg count hosts sparkline msg_per_sec sourcetype index first_msg_time last_msg_time delta  | sort -count

purpose:

identifies frequently occurring errors in your splunk instance. LSS knocking out the top 10 on this list will make your splunk instance very happy

requirements:

comments:

Unauthorized Foreign Activity

layout=edit | geoip clientip as clientip | table _time clientip client_country | where client_country NOT ("Germany" OR "Austria" OR "Switzerland")

purpose:

Detect unauthorized admin activity via foreign country

requirements:

Logs with external source IP's

comments:

  • Search for admin activity – like on my webpage in a CMS system for example for "layout=edit"
  • Display all the IP's with table clientip _time
  • Enrich them with geoip lookup (geoip clientip)
  • Display all changes with geo information:
    • layout=edit | lookup geoip clientip as clientip | table _time clientip client_country
  • Review them and create a simple whitelists | where client_country NOT ("Germany" OR "Austria" OR "Switzerland")

Speed / Distance Login Anomaly

index=geod
| iplocation clientip 
| sort _time 
| strcat lat "," lon latlon 
| streamstats current=f global=f window=1 last(latlon) as last_latlon
| eval last_latlon=if(isnull(last_latlon), latlon, last_latlon)
| streamstats current=f global=f window=1 last(_time) as last_ts
| eval time_since_last = _time - last_ts
| eval time_since_last=if(isnull(time_since_last), 0, time_since_last)
| haversine originField=last_latlon outputField=distance units=mi latlon
| eval speed=if(time_since_last==0, 0, (distance/(time_since_last/60/60)))
| where speed > 500
| strcat speed " MPH" speed
| table user, distance, _time, time_since_last, speed, _raw

purpose:

Find those tuples of events where the speed needed to cover distance in time between events is greater than 500MPH

requirements:

haversine app clientip

comments:

Auth anomaly basic with haversine

index=geod 
| iplocation clientip 
| sort _time 
| strcat lat "," lon latlon 
| streamstats current=f global=f window=1 last(latlon) as last_latlon
| eval last_latlon=if(isnull(last_latlon), latlon, last_latlon)
| streamstats current=f global=f window=1 last(_time) as last_ts
| eval time_since_last = _time - last_ts
| eval time_since_last=if(isnull(time_since_last), 0, time_since_last)
| haversine originField=last_latlon outputField=distance units=mi latlon
| eval speed=if(time_since_last==0, 0, (distance/(time_since_last/60/60)))
| strcat speed " MPH" speed
| table user, distance, _time, time_since_last, speed, _raw

purpose:

Find the speed needed to cover the distance between the ip-location specified in two different login events

requirements:

haversine app clientip as ip address

comments:

Combine dbinspect and REST api data for buckets

| dbinspect index=*
| foreach * [eval dbinspect_<<FIELD>> = '<<FIELD>>']
| table dbinspect_*
| append [
  | rest splunk_server_group=dmc_group_cluster_master "/services/cluster/master/buckets"
  | foreach * [eval rest_api_<<FIELD>> = '<<FIELD>>']
  | table rest_api_* 
  ]
| eval bucketId=if(isNull(rest_api_title),dbinspect_bucketId,rest_api_title)
| stats values(*) as * by bucketId
| foreach rest_api_peers.*.* [eval rest_api_<<MATCHSEG2>>=""]
| foreach rest_api_peers.*.* [eval rest_api_<<MATCHSEG2>>=if("<<MATCHSEG1>>"=dbinspect_bucketId,'<<FIELD>>','<<MATCHSEG2>>')]
| fields - rest_api_peers.*

purpose:

requirements:

Needs to be executed on a search head that can query the cluster master REST API

comments:

The dbinspect API doesn't return consistent information about the size of buckets.

Work out data volumes by source type

| metadata type=sourcetypes
| noop sample_ratio=1000
| append [ search index=*  
   | eval size=length(_raw) 
   | stats avg(size) as average_event_size by sourcetype index
   ]
| stats values(totalCount) as total_events values(average_event_size) as average_event_size by sourcetype
| addinfo
| eval period_days=(info_max_time-info_min_time)/(24*60*60)
| eval totalMB_per_day=floor(total_events*average_event_size/period_days/1024/1024)
| table sourcetype totalMB_per_day

purpose:

Efficiently calculate how much data is being indexed per day by source type. Very useful for calculating enterprise security data volumes

requirements:

Requires 6.3.x or later for the event sampling feature

comments:

Combines results from | metadata for counts and then multiplies this by the average event size. Automatically accounts for time ranges. You will need to modify the sample rate to be suitable for your data volume. The metadata search turns out to be very approximate and counts the values associate with buckets, if you have buckets which are open for a very long time it will take the value for the entire period of the bucket, not the period of your search time range. Consider using tstats if this is an issue in your environment.

Indexes my user can search.

| rest /services/data/indexes
| search
    [
    | rest /services/data/indexes
    | dedup title
    | table title
    | search
        [
            | rest splunk_server=local /services/authorization/roles
            | search
                [
                        | rest splunk_server=local /services/authentication/users
                        | search
                            [
                                        | rest /services/authentication/current-context
                                        | search type=splunk
                                        | table username
                                        | rename username as title ]
| fields roles
| mvexpand roles
| rename roles as title] imported_srchIndexesAllowed=*
| table imported_srchIndexesAllowed
| rename imported_srchIndexesAllowed as title
| mvexpand title] ]
| stats values(splunk_server) as splunkserver by title
| eval splunkserver=mvjoin(splunkserver,":")
| lookup non_internal_indexes title as title OUTPUT description as description
| fields title, description, splunkserver
| rename title AS Index

purpose:

requirements:

lookup csv with columns "title" of Index and "description"

comments:

We use in a ~200 Users env to show the user in which Index he is allowed to search.

detect time problem : (index time and time being really different)

sourcetype=*
| eval diff=_indextime-_time| eval indextime=strftime(_indextime,"%Y-%m-%d %H:%M:%S")
| table indextime _time diff sourcetype
| where lat!=0
| timechart avg(diff) max(diff) by sourcetype

purpose:

requirements:

comments:

this search simply graph the time difference between the event time and the indexing time by sourcetype : this helps troubleshoot sourcetypes time issues.

User Search Restrictions and Last Logins

| rest /services/authentication/users splunk_server=local
| table title realname email roles 
| mvexpand roles 
| join roles type=outer 
    [| rest /services/authorization/roles splunk_server=local
    | fields imported* title srch* 
    | fields - *Quota *TimeWin *capabilities 
    | rex mode=sed field=srchFilter "s/^\*$/true/" 
    | rex mode=sed field=imported_srchFilter "s/^\*$/true/" 
    | eval search_restrictions=if(imported_srchFilter="","( ".srchFilter." )",if(srchFilter="","( ".imported_srchFilter." )","( ".srchFilter." ) OR ( ".imported_srchFilter." ) ")) 
    | fields - srchIndexesDefault 
    | eval srchIndexesAllowed_new=mvjoin(srchIndexesAllowed," OR index=") 
    | eval index_restrictions="( index=".srchIndexesAllowed_new." )" 
    | table title search_restrictions index_restrictions 
    | rename title AS roles ] 
| stats values(roles) AS roles values(search_restrictions) AS search_restrictions values(index_restrictions) AS index_restrictions by title,realname,email 
| rex mode=sed field=search_restrictions "s/\( \)//g" 
| eval search_restrictions=mvjoin(search_restrictions," OR ") 
| eval index_restrictions=mvjoin(index_restrictions," OR ") 
| eval realname=upper(realname)
| rename title AS username
| eval restrictions=if(search_restrictions="( )",index_restrictions,search_restrictions." ".index_restrictions)
| table username email username roles restrictions
| search username=* 
| rex mode=sed field=restrictions "s/^ OR //g" 
| join type=outer username 
    [ search (index=_audit "login attempt info=succeeded) OR (index=_internal "GET /splunk/en-US/account/login") 
    | stats max(_time) AS "last_login" by user 
    | rename user AS username ] 
| eval days_ago=floor((now()-last_login)/86400) 
| eval last_login=strftime(last_login,"%Y-%m-%d %H:%M:%S") 
| fillnull last_login value="Never"

purpose:

requirements:

comments:

Shows a table showing the inherited search and index restrictions for each user, represented as an SPL string. In certain cases it may not be 100% accurate, but it covers most instances that I have run into. NOTE: does not account for `searchFilterSelecting = false` defined in authorize.conf