Detect unauthorized data transfers using timeseries anomaly ASIM Web Session
Id | 5965d3e7-8ed0-477c-9b42-e75d9237fab0 |
Rulename | Detect unauthorized data transfers using timeseries anomaly (ASIM Web Session) |
Description | This query utilizes built-in KQL anomaly detection algorithms to identify anomalous data transfers to public networks. It detects significant deviations from a baseline pattern, allowing the detection of sudden increases in data transferred to unknown public networks, which may indicate data exfiltration attempts. Investigating such anomalies is crucial. The score indicates the degree to which the data transfer deviates from the baseline value. A higher score indicates a greater deviation. The query’s output provides an aggregated summary view of the traffic observed in the flagged anomaly hour, including unique combinations of source IP addresses, destination IP addresses, and port bytes sent. It may be necessary to run queries for individual source IP addresses from the provided ‘SourceIPlist’ to identify any suspicious activity that warrants further investigation |
Severity | Medium |
Tactics | Exfiltration |
Techniques | T1030 |
Kind | Scheduled |
Query frequency | 1d |
Query period | 14d |
Trigger threshold | 0 |
Trigger operator | gt |
Source Uri | https://github.com/Azure/Azure-Sentinel/blob/master/Solutions/Web Session Essentials/Analytic Rules/DataExfiltrationTimeSeriesAnomaly.yaml |
Version | 1.0.1 |
Arm template | 5965d3e7-8ed0-477c-9b42-e75d9237fab0.json |
let startTime = 14d;
let endTime = 1d;
let timeframe = 1h;
let scorethreshold = 5;
let bytessentperhourthreshold = 10;
// calculate avg. eps(events per second)
let eps = materialize(_Im_WebSession(starttime=ago(1d))
| project TimeGenerated
| summarize AvgPerSec = count() / 3600 by bin(TimeGenerated, 1h)
| summarize round(avg(AvgPerSec))
);
let summarizationexist = (
union isfuzzy=true
(
WebSession_Summarized_SrcIP_CL
| where EventTime_t > ago(1d)
| project v = int(2)
),
(
print int(1)
| project v = print_0
)
| summarize maxv = max(v)
| extend sumexist = (maxv > 1)
);
let TimeSeriesData = union isfuzzy=true
(
(datatable(exists: int, sumexist: bool)[1, false]
| where toscalar(eps) > 1000
| join (summarizationexist) on sumexist)
| join (
_Im_WebSession(starttime=ago(2d), endtime=now())
| project DstIpAddr, SrcBytes, TimeGenerated, EventProduct
| where isnotempty(DstIpAddr)
and not(ipv4_is_private(DstIpAddr))
and isnotempty(SrcBytes)
| summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)
| extend EventTime = TimeGenerated, exists=int(1)
)
on exists
| project-away exists*, maxv, sum*
),
(
(datatable(exists: int, sumexist: bool)[1, false]
| where toscalar(eps) between (501 .. 1000)
| join (summarizationexist) on sumexist)
| join (
_Im_WebSession(starttime=ago(3d), endtime=now())
| project DstIpAddr, SrcBytes, TimeGenerated, EventProduct
| where isnotempty(DstIpAddr)
and not(ipv4_is_private(DstIpAddr))
and isnotempty(SrcBytes)
| summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)
| extend EventTime = TimeGenerated, exists=int(1)
)
on exists
| project-away exists*, maxv, sum*
),
(
(datatable(exists: int, sumexist: bool)[1, false]
| where toscalar(eps) <= 500
| join (summarizationexist) on sumexist)
| join (
_Im_WebSession(starttime=ago(4d), endtime=now())
| project DstIpAddr, SrcBytes, TimeGenerated, EventProduct
| where isnotempty(DstIpAddr)
and not(ipv4_is_private(DstIpAddr))
and isnotempty(SrcBytes)
| summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)
| extend EventTime = TimeGenerated, exists=int(1)
)
on exists
| project-away exists*, maxv, sum*
),
(
WebSession_Summarized_SrcIP_CL
| where EventTime_t between (ago(startTime) .. now())
| where isnotempty(SrcBytes_d) and not(DstIPIsPrivate_b)
| project
SrcBytesSum=tolong(SrcBytes_d),
EventTime=EventTime_t,
EventProduct = EventProduct_s
)
| make-series TotalBytesSent = sum(SrcBytesSum) on EventTime from startofday(ago(startTime)) to startofday(now()) step timeframe by EventProduct;
// TimeSeriesData block ends here
//Take only anomalies in TimeSeriesData
let TimeSeriesAnomalies = materialize(TimeSeriesData
| extend (anomalies, score, baseline) = series_decompose_anomalies(TotalBytesSent, scorethreshold, -1, 'linefit')
| mv-expand
TotalBytesSent to typeof(long),
EventTime to typeof(datetime),
anomalies to typeof(double),
score to typeof(double),
baseline to typeof(long)
| where anomalies > 0 and baseline > 0
| extend AnomalyHour = EventTime
| extend
TotalBytesSentinMBperHour = round(((TotalBytesSent / 1024) / 1024), 2),
BaselineBytesSentinMBperHour = round(((baseline / 1024) / 1024), 2),
score = round(score, 2)
| project
EventProduct,
AnomalyHour,
TotalBytesSentinMBperHour,
BaselineBytesSentinMBperHour,
anomalies,
score
| where AnomalyHour between (startofday(ago(endTime)) .. startofday(now())) // Get TimeSeriesAnomalies in previous day
);
// TimeSeriesAlerts block end here
let AnomalyHours = materialize (TimeSeriesAnomalies
| project AnomalyHour);
//Previous day aggregated per hour
let PreviousDayLogs =
_Im_WebSession(starttime=startofday(ago(endTime)), endtime=startofday(now()))
| where isnotempty(DstIpAddr) and isnotempty(SrcIpAddr) and isnotempty(SrcBytes)
| where not(ipv4_is_private(DstIpAddr))
| project
TimeGenerated,
DstIpAddr,
SrcIpAddr,
SrcBytes,
DstBytes,
DstPortNumber,
EventProduct
| extend DateHour = bin(TimeGenerated, timeframe) // create a new column and round to hour
| where DateHour in (AnomalyHours) // Filter dataset to include only anomaly AnomalyHours
| extend
SentBytesinMB = ((SrcBytes / 1024) / 1024),
ReceivedBytesinMB = ((DstBytes / 1024) / 1024)
| summarize
HourlyCount = count(),
TimeGeneratedMax = arg_max(TimeGenerated, *),
DestinationIPList = make_set(DstIpAddr, 100),
DestinationPortList = make_set(DstPortNumber, 100),
SentBytesinMB = tolong(sum(SentBytesinMB)),
ReceivedBytesinMB = tolong(sum(ReceivedBytesinMB))
by SrcIpAddr, EventProduct, TimeGeneratedHour = bin(TimeGenerated, timeframe)
| where SentBytesinMB > bytessentperhourthreshold
| sort by TimeGeneratedHour asc, SentBytesinMB desc
| extend Rank=row_number(1, prev(TimeGeneratedHour) != TimeGeneratedHour) // Ranking the dataset per Hourly Partition
| where Rank <= 10 // Selecting Top 10 records with Highest BytesSent in each Hour
| project
EventProduct,
TimeGeneratedHour,
TimeGeneratedMax,
SrcIpAddr,
DestinationIPList,
DestinationPortList,
SentBytesinMB,
ReceivedBytesinMB,
Rank,
HourlyCount;
// PreviousDayLogs block ends here
TimeSeriesAnomalies
| join kind = inner (PreviousDayLogs
| extend AnomalyHour = TimeGeneratedHour)
on EventProduct, AnomalyHour
| sort by score desc
| project
EventProduct,
AnomalyHour,
TimeGeneratedMax,
SrcIpAddr,
DestinationIPList,
DestinationPortList,
SentBytesinMB,
ReceivedBytesinMB,
TotalBytesSentinMBperHour,
BaselineBytesSentinMBperHour,
score,
anomalies,
HourlyCount
| summarize
EventCount = sum(HourlyCount),
startTimeUtc = min(TimeGeneratedMax),
EndTimeUtc = max(TimeGeneratedMax),
SentBytesinMB = sum(SentBytesinMB),
ReceivedBytesinMB = sum(ReceivedBytesinMB),
SourceIP = take_any(SrcIpAddr),
SourceIPList = make_set(SrcIpAddr, 10),
DestinationIPList = make_set(DestinationIPList, 100),
DestinationPortList = make_set(DestinationPortList, 100)
by
AnomalyHour,
TotalBytesSentinMBperHour,
BaselineBytesSentinMBperHour,
score,
anomalies,
EventProduct
| project
EventProduct,
AnomalyHour,
startTimeUtc,
EndTimeUtc,
SourceIP,
SourceIPList,
DestinationIPList,
DestinationPortList,
SentBytesinMB,
ReceivedBytesinMB,
TotalBytesSentinMBperHour,
BaselineBytesSentinMBperHour,
anomalies,
score,
EventCount
relevantTechniques:
- T1030
name: Detect unauthorized data transfers using timeseries anomaly (ASIM Web Session)
requiredDataConnectors: []
entityMappings:
- fieldMappings:
- identifier: Address
columnName: SourceIP
entityType: IP
triggerThreshold: 0
id: 5965d3e7-8ed0-477c-9b42-e75d9237fab0
tactics:
- Exfiltration
version: 1.0.1
customDetails:
DestinationPortList: DestinationPortList
score: score
DestinationIPList: DestinationIPList
ReceivedBytesinMB: ReceivedBytesinMB
anomalies: anomalies
EventCount: EventCount
SourceIPList: SourceIPList
SentBytesinMB: SentBytesinMB
queryPeriod: 14d
alertDetailsOverride:
alertDisplayNameFormat: IP address '{{SourceIP}}' is engaged in data transfers to a public network that exceeds usual levels
alertDescriptionFormat: "Please conduct a thorough investigation of each IPAddresses listed in SourceIPList: '{{SourceIPList}}' to identify any suspicious activities that may require further investigation. 'SourceIPList' include the top 10 client IP addresses that transmitted the highest amount of data during the anomalous hour"
triggerOperator: gt
kind: Scheduled
tags:
- Schema: WebSession
SchemaVersion: 0.2.6
eventGroupingSettings:
aggregationKind: AlertPerResult
OriginalUri: https://github.com/Azure/Azure-Sentinel/blob/master/Solutions/Web Session Essentials/Analytic Rules/DataExfiltrationTimeSeriesAnomaly.yaml
queryFrequency: 1d
severity: Medium
status: Available
description: |
'This query utilizes built-in KQL anomaly detection algorithms to identify anomalous data transfers to public networks. It detects significant deviations from a baseline pattern, allowing the detection of sudden increases in data transferred to unknown public networks, which may indicate data exfiltration attempts. Investigating such anomalies is crucial.
The score indicates the degree to which the data transfer deviates from the baseline value. A higher score indicates a greater deviation. The query's output provides an aggregated summary view of the traffic observed in the flagged anomaly hour, including unique combinations of source IP addresses, destination IP addresses, and port bytes sent. It may be necessary to run queries for individual source IP addresses from the provided 'SourceIPlist' to identify any suspicious activity that warrants further investigation'
query: |
let startTime = 14d;
let endTime = 1d;
let timeframe = 1h;
let scorethreshold = 5;
let bytessentperhourthreshold = 10;
// calculate avg. eps(events per second)
let eps = materialize(_Im_WebSession(starttime=ago(1d))
| project TimeGenerated
| summarize AvgPerSec = count() / 3600 by bin(TimeGenerated, 1h)
| summarize round(avg(AvgPerSec))
);
let summarizationexist = (
union isfuzzy=true
(
WebSession_Summarized_SrcIP_CL
| where EventTime_t > ago(1d)
| project v = int(2)
),
(
print int(1)
| project v = print_0
)
| summarize maxv = max(v)
| extend sumexist = (maxv > 1)
);
let TimeSeriesData = union isfuzzy=true
(
(datatable(exists: int, sumexist: bool)[1, false]
| where toscalar(eps) > 1000
| join (summarizationexist) on sumexist)
| join (
_Im_WebSession(starttime=ago(2d), endtime=now())
| project DstIpAddr, SrcBytes, TimeGenerated, EventProduct
| where isnotempty(DstIpAddr)
and not(ipv4_is_private(DstIpAddr))
and isnotempty(SrcBytes)
| summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)
| extend EventTime = TimeGenerated, exists=int(1)
)
on exists
| project-away exists*, maxv, sum*
),
(
(datatable(exists: int, sumexist: bool)[1, false]
| where toscalar(eps) between (501 .. 1000)
| join (summarizationexist) on sumexist)
| join (
_Im_WebSession(starttime=ago(3d), endtime=now())
| project DstIpAddr, SrcBytes, TimeGenerated, EventProduct
| where isnotempty(DstIpAddr)
and not(ipv4_is_private(DstIpAddr))
and isnotempty(SrcBytes)
| summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)
| extend EventTime = TimeGenerated, exists=int(1)
)
on exists
| project-away exists*, maxv, sum*
),
(
(datatable(exists: int, sumexist: bool)[1, false]
| where toscalar(eps) <= 500
| join (summarizationexist) on sumexist)
| join (
_Im_WebSession(starttime=ago(4d), endtime=now())
| project DstIpAddr, SrcBytes, TimeGenerated, EventProduct
| where isnotempty(DstIpAddr)
and not(ipv4_is_private(DstIpAddr))
and isnotempty(SrcBytes)
| summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)
| extend EventTime = TimeGenerated, exists=int(1)
)
on exists
| project-away exists*, maxv, sum*
),
(
WebSession_Summarized_SrcIP_CL
| where EventTime_t between (ago(startTime) .. now())
| where isnotempty(SrcBytes_d) and not(DstIPIsPrivate_b)
| project
SrcBytesSum=tolong(SrcBytes_d),
EventTime=EventTime_t,
EventProduct = EventProduct_s
)
| make-series TotalBytesSent = sum(SrcBytesSum) on EventTime from startofday(ago(startTime)) to startofday(now()) step timeframe by EventProduct;
// TimeSeriesData block ends here
//Take only anomalies in TimeSeriesData
let TimeSeriesAnomalies = materialize(TimeSeriesData
| extend (anomalies, score, baseline) = series_decompose_anomalies(TotalBytesSent, scorethreshold, -1, 'linefit')
| mv-expand
TotalBytesSent to typeof(long),
EventTime to typeof(datetime),
anomalies to typeof(double),
score to typeof(double),
baseline to typeof(long)
| where anomalies > 0 and baseline > 0
| extend AnomalyHour = EventTime
| extend
TotalBytesSentinMBperHour = round(((TotalBytesSent / 1024) / 1024), 2),
BaselineBytesSentinMBperHour = round(((baseline / 1024) / 1024), 2),
score = round(score, 2)
| project
EventProduct,
AnomalyHour,
TotalBytesSentinMBperHour,
BaselineBytesSentinMBperHour,
anomalies,
score
| where AnomalyHour between (startofday(ago(endTime)) .. startofday(now())) // Get TimeSeriesAnomalies in previous day
);
// TimeSeriesAlerts block end here
let AnomalyHours = materialize (TimeSeriesAnomalies
| project AnomalyHour);
//Previous day aggregated per hour
let PreviousDayLogs =
_Im_WebSession(starttime=startofday(ago(endTime)), endtime=startofday(now()))
| where isnotempty(DstIpAddr) and isnotempty(SrcIpAddr) and isnotempty(SrcBytes)
| where not(ipv4_is_private(DstIpAddr))
| project
TimeGenerated,
DstIpAddr,
SrcIpAddr,
SrcBytes,
DstBytes,
DstPortNumber,
EventProduct
| extend DateHour = bin(TimeGenerated, timeframe) // create a new column and round to hour
| where DateHour in (AnomalyHours) // Filter dataset to include only anomaly AnomalyHours
| extend
SentBytesinMB = ((SrcBytes / 1024) / 1024),
ReceivedBytesinMB = ((DstBytes / 1024) / 1024)
| summarize
HourlyCount = count(),
TimeGeneratedMax = arg_max(TimeGenerated, *),
DestinationIPList = make_set(DstIpAddr, 100),
DestinationPortList = make_set(DstPortNumber, 100),
SentBytesinMB = tolong(sum(SentBytesinMB)),
ReceivedBytesinMB = tolong(sum(ReceivedBytesinMB))
by SrcIpAddr, EventProduct, TimeGeneratedHour = bin(TimeGenerated, timeframe)
| where SentBytesinMB > bytessentperhourthreshold
| sort by TimeGeneratedHour asc, SentBytesinMB desc
| extend Rank=row_number(1, prev(TimeGeneratedHour) != TimeGeneratedHour) // Ranking the dataset per Hourly Partition
| where Rank <= 10 // Selecting Top 10 records with Highest BytesSent in each Hour
| project
EventProduct,
TimeGeneratedHour,
TimeGeneratedMax,
SrcIpAddr,
DestinationIPList,
DestinationPortList,
SentBytesinMB,
ReceivedBytesinMB,
Rank,
HourlyCount;
// PreviousDayLogs block ends here
TimeSeriesAnomalies
| join kind = inner (PreviousDayLogs
| extend AnomalyHour = TimeGeneratedHour)
on EventProduct, AnomalyHour
| sort by score desc
| project
EventProduct,
AnomalyHour,
TimeGeneratedMax,
SrcIpAddr,
DestinationIPList,
DestinationPortList,
SentBytesinMB,
ReceivedBytesinMB,
TotalBytesSentinMBperHour,
BaselineBytesSentinMBperHour,
score,
anomalies,
HourlyCount
| summarize
EventCount = sum(HourlyCount),
startTimeUtc = min(TimeGeneratedMax),
EndTimeUtc = max(TimeGeneratedMax),
SentBytesinMB = sum(SentBytesinMB),
ReceivedBytesinMB = sum(ReceivedBytesinMB),
SourceIP = take_any(SrcIpAddr),
SourceIPList = make_set(SrcIpAddr, 10),
DestinationIPList = make_set(DestinationIPList, 100),
DestinationPortList = make_set(DestinationPortList, 100)
by
AnomalyHour,
TotalBytesSentinMBperHour,
BaselineBytesSentinMBperHour,
score,
anomalies,
EventProduct
| project
EventProduct,
AnomalyHour,
startTimeUtc,
EndTimeUtc,
SourceIP,
SourceIPList,
DestinationIPList,
DestinationPortList,
SentBytesinMB,
ReceivedBytesinMB,
TotalBytesSentinMBperHour,
BaselineBytesSentinMBperHour,
anomalies,
score,
EventCount
{
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"workspace": {
"type": "String"
}
},
"resources": [
{
"apiVersion": "2024-01-01-preview",
"id": "[concat(resourceId('Microsoft.OperationalInsights/workspaces/providers', parameters('workspace'), 'Microsoft.SecurityInsights'),'/alertRules/5965d3e7-8ed0-477c-9b42-e75d9237fab0')]",
"kind": "Scheduled",
"name": "[concat(parameters('workspace'),'/Microsoft.SecurityInsights/5965d3e7-8ed0-477c-9b42-e75d9237fab0')]",
"properties": {
"alertDetailsOverride": {
"alertDescriptionFormat": "Please conduct a thorough investigation of each IPAddresses listed in SourceIPList: '{{SourceIPList}}' to identify any suspicious activities that may require further investigation. 'SourceIPList' include the top 10 client IP addresses that transmitted the highest amount of data during the anomalous hour",
"alertDisplayNameFormat": "IP address '{{SourceIP}}' is engaged in data transfers to a public network that exceeds usual levels"
},
"alertRuleTemplateName": "5965d3e7-8ed0-477c-9b42-e75d9237fab0",
"customDetails": {
"anomalies": "anomalies",
"DestinationIPList": "DestinationIPList",
"DestinationPortList": "DestinationPortList",
"EventCount": "EventCount",
"ReceivedBytesinMB": "ReceivedBytesinMB",
"score": "score",
"SentBytesinMB": "SentBytesinMB",
"SourceIPList": "SourceIPList"
},
"description": "'This query utilizes built-in KQL anomaly detection algorithms to identify anomalous data transfers to public networks. It detects significant deviations from a baseline pattern, allowing the detection of sudden increases in data transferred to unknown public networks, which may indicate data exfiltration attempts. Investigating such anomalies is crucial.\nThe score indicates the degree to which the data transfer deviates from the baseline value. A higher score indicates a greater deviation. The query's output provides an aggregated summary view of the traffic observed in the flagged anomaly hour, including unique combinations of source IP addresses, destination IP addresses, and port bytes sent. It may be necessary to run queries for individual source IP addresses from the provided 'SourceIPlist' to identify any suspicious activity that warrants further investigation'\n",
"displayName": "Detect unauthorized data transfers using timeseries anomaly (ASIM Web Session)",
"enabled": true,
"entityMappings": [
{
"entityType": "IP",
"fieldMappings": [
{
"columnName": "SourceIP",
"identifier": "Address"
}
]
}
],
"eventGroupingSettings": {
"aggregationKind": "AlertPerResult"
},
"OriginalUri": "https://github.com/Azure/Azure-Sentinel/blob/master/Solutions/Web Session Essentials/Analytic Rules/DataExfiltrationTimeSeriesAnomaly.yaml",
"query": "let startTime = 14d;\nlet endTime = 1d;\nlet timeframe = 1h;\nlet scorethreshold = 5;\nlet bytessentperhourthreshold = 10;\n// calculate avg. eps(events per second)\nlet eps = materialize(_Im_WebSession(starttime=ago(1d))\n | project TimeGenerated\n | summarize AvgPerSec = count() / 3600 by bin(TimeGenerated, 1h)\n | summarize round(avg(AvgPerSec))\n );\nlet summarizationexist = (\n union isfuzzy=true \n (\n WebSession_Summarized_SrcIP_CL\n | where EventTime_t > ago(1d) \n | project v = int(2)\n ),\n (\n print int(1) \n | project v = print_0\n )\n | summarize maxv = max(v)\n | extend sumexist = (maxv > 1)\n );\nlet TimeSeriesData = union isfuzzy=true \n (\n (datatable(exists: int, sumexist: bool)[1, false]\n | where toscalar(eps) > 1000\n | join (summarizationexist) on sumexist)\n | join (\n _Im_WebSession(starttime=ago(2d), endtime=now())\n | project DstIpAddr, SrcBytes, TimeGenerated, EventProduct\n | where isnotempty(DstIpAddr)\n and not(ipv4_is_private(DstIpAddr))\n and isnotempty(SrcBytes)\n | summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)\n | extend EventTime = TimeGenerated, exists=int(1)\n )\n on exists\n | project-away exists*, maxv, sum*\n ),\n (\n (datatable(exists: int, sumexist: bool)[1, false]\n | where toscalar(eps) between (501 .. 1000)\n | join (summarizationexist) on sumexist)\n | join (\n _Im_WebSession(starttime=ago(3d), endtime=now())\n | project DstIpAddr, SrcBytes, TimeGenerated, EventProduct\n | where isnotempty(DstIpAddr)\n and not(ipv4_is_private(DstIpAddr))\n and isnotempty(SrcBytes)\n | summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)\n | extend EventTime = TimeGenerated, exists=int(1)\n )\n on exists\n | project-away exists*, maxv, sum*\n ),\n (\n (datatable(exists: int, sumexist: bool)[1, false]\n | where toscalar(eps) <= 500\n | join (summarizationexist) on sumexist)\n | join (\n _Im_WebSession(starttime=ago(4d), endtime=now())\n | project DstIpAddr, SrcBytes, TimeGenerated, EventProduct\n | where isnotempty(DstIpAddr)\n and not(ipv4_is_private(DstIpAddr))\n and isnotempty(SrcBytes)\n | summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)\n | extend EventTime = TimeGenerated, exists=int(1)\n )\n on exists\n | project-away exists*, maxv, sum*\n ),\n (\n WebSession_Summarized_SrcIP_CL\n | where EventTime_t between (ago(startTime) .. now())\n | where isnotempty(SrcBytes_d) and not(DstIPIsPrivate_b)\n | project\n SrcBytesSum=tolong(SrcBytes_d),\n EventTime=EventTime_t,\n EventProduct = EventProduct_s\n )\n | make-series TotalBytesSent = sum(SrcBytesSum) on EventTime from startofday(ago(startTime)) to startofday(now()) step timeframe by EventProduct;\n// TimeSeriesData block ends here\n//Take only anomalies in TimeSeriesData\nlet TimeSeriesAnomalies = materialize(TimeSeriesData\n | extend (anomalies, score, baseline) = series_decompose_anomalies(TotalBytesSent, scorethreshold, -1, 'linefit')\n | mv-expand\n TotalBytesSent to typeof(long),\n EventTime to typeof(datetime),\n anomalies to typeof(double),\n score to typeof(double),\n baseline to typeof(long)\n | where anomalies > 0 and baseline > 0\n | extend AnomalyHour = EventTime\n | extend\n TotalBytesSentinMBperHour = round(((TotalBytesSent / 1024) / 1024), 2),\n BaselineBytesSentinMBperHour = round(((baseline / 1024) / 1024), 2),\n score = round(score, 2)\n | project\n EventProduct,\n AnomalyHour,\n TotalBytesSentinMBperHour,\n BaselineBytesSentinMBperHour,\n anomalies,\n score\n | where AnomalyHour between (startofday(ago(endTime)) .. startofday(now())) // Get TimeSeriesAnomalies in previous day\n );\n// TimeSeriesAlerts block end here\nlet AnomalyHours = materialize (TimeSeriesAnomalies\n | project AnomalyHour);\n//Previous day aggregated per hour\nlet PreviousDayLogs = \n _Im_WebSession(starttime=startofday(ago(endTime)), endtime=startofday(now()))\n | where isnotempty(DstIpAddr) and isnotempty(SrcIpAddr) and isnotempty(SrcBytes)\n | where not(ipv4_is_private(DstIpAddr))\n | project\n TimeGenerated,\n DstIpAddr,\n SrcIpAddr,\n SrcBytes,\n DstBytes,\n DstPortNumber,\n EventProduct\n | extend DateHour = bin(TimeGenerated, timeframe) // create a new column and round to hour\n | where DateHour in (AnomalyHours) // Filter dataset to include only anomaly AnomalyHours\n | extend\n SentBytesinMB = ((SrcBytes / 1024) / 1024),\n ReceivedBytesinMB = ((DstBytes / 1024) / 1024)\n | summarize\n HourlyCount = count(),\n TimeGeneratedMax = arg_max(TimeGenerated, *),\n DestinationIPList = make_set(DstIpAddr, 100),\n DestinationPortList = make_set(DstPortNumber, 100),\n SentBytesinMB = tolong(sum(SentBytesinMB)),\n ReceivedBytesinMB = tolong(sum(ReceivedBytesinMB))\n by SrcIpAddr, EventProduct, TimeGeneratedHour = bin(TimeGenerated, timeframe)\n | where SentBytesinMB > bytessentperhourthreshold\n | sort by TimeGeneratedHour asc, SentBytesinMB desc\n | extend Rank=row_number(1, prev(TimeGeneratedHour) != TimeGeneratedHour) // Ranking the dataset per Hourly Partition\n | where Rank <= 10 // Selecting Top 10 records with Highest BytesSent in each Hour\n | project\n EventProduct,\n TimeGeneratedHour,\n TimeGeneratedMax,\n SrcIpAddr,\n DestinationIPList,\n DestinationPortList,\n SentBytesinMB,\n ReceivedBytesinMB,\n Rank,\n HourlyCount;\n// PreviousDayLogs block ends here\nTimeSeriesAnomalies\n| join kind = inner (PreviousDayLogs\n | extend AnomalyHour = TimeGeneratedHour)\n on EventProduct, AnomalyHour\n| sort by score desc\n| project\n EventProduct,\n AnomalyHour,\n TimeGeneratedMax,\n SrcIpAddr,\n DestinationIPList,\n DestinationPortList,\n SentBytesinMB,\n ReceivedBytesinMB,\n TotalBytesSentinMBperHour,\n BaselineBytesSentinMBperHour,\n score,\n anomalies,\n HourlyCount\n| summarize\n EventCount = sum(HourlyCount),\n startTimeUtc = min(TimeGeneratedMax),\n EndTimeUtc = max(TimeGeneratedMax),\n SentBytesinMB = sum(SentBytesinMB),\n ReceivedBytesinMB = sum(ReceivedBytesinMB),\n SourceIP = take_any(SrcIpAddr),\n SourceIPList = make_set(SrcIpAddr, 10),\n DestinationIPList = make_set(DestinationIPList, 100),\n DestinationPortList = make_set(DestinationPortList, 100)\n by\n AnomalyHour,\n TotalBytesSentinMBperHour,\n BaselineBytesSentinMBperHour,\n score,\n anomalies,\n EventProduct\n| project\n EventProduct,\n AnomalyHour,\n startTimeUtc,\n EndTimeUtc,\n SourceIP,\n SourceIPList,\n DestinationIPList,\n DestinationPortList,\n SentBytesinMB,\n ReceivedBytesinMB,\n TotalBytesSentinMBperHour,\n BaselineBytesSentinMBperHour,\n anomalies,\n score,\n EventCount\n",
"queryFrequency": "P1D",
"queryPeriod": "P14D",
"severity": "Medium",
"status": "Available",
"subTechniques": [],
"suppressionDuration": "PT1H",
"suppressionEnabled": false,
"tactics": [
"Exfiltration"
],
"tags": [
{
"Schema": "WebSession",
"SchemaVersion": "0.2.6"
}
],
"techniques": [
"T1030"
],
"templateVersion": "1.0.1",
"triggerOperator": "GreaterThan",
"triggerThreshold": 0
},
"type": "Microsoft.OperationalInsights/workspaces/providers/alertRules"
}
]
}