Microsoft Sentinel Analytic Rules
cloudbrothers.infoAzure Sentinel RepoToggle Dark/Light/Auto modeToggle Dark/Light/Auto modeToggle Dark/Light/Auto modeBack to homepage

Detect unauthorized data transfers using timeseries anomaly ASIM Web Session

Back
Id5965d3e7-8ed0-477c-9b42-e75d9237fab0
RulenameDetect unauthorized data transfers using timeseries anomaly (ASIM Web Session)
DescriptionThis query utilizes built-in KQL anomaly detection algorithms to identify anomalous data transfers to public networks. It detects significant deviations from a baseline pattern, allowing the detection of sudden increases in data transferred to unknown public networks, which may indicate data exfiltration attempts. Investigating such anomalies is crucial.

The score indicates the degree to which the data transfer deviates from the baseline value. A higher score indicates a greater deviation. The query’s output provides an aggregated summary view of the traffic observed in the flagged anomaly hour, including unique combinations of source IP addresses, destination IP addresses, and port bytes sent. It may be necessary to run queries for individual source IP addresses from the provided ‘SourceIPlist’ to identify any suspicious activity that warrants further investigation
SeverityMedium
TacticsExfiltration
TechniquesT1030
KindScheduled
Query frequency1d
Query period14d
Trigger threshold0
Trigger operatorgt
Source Urihttps://github.com/Azure/Azure-Sentinel/blob/master/Solutions/Web Session Essentials/Analytic Rules/DataExfiltrationTimeSeriesAnomaly.yaml
Version1.0.1
Arm template5965d3e7-8ed0-477c-9b42-e75d9237fab0.json
Deploy To Azure
let startTime = 14d;
let endTime = 1d;
let timeframe = 1h;
let scorethreshold = 5;
let bytessentperhourthreshold = 10;
// calculate avg. eps(events per second)
let eps = materialize(_Im_WebSession(starttime=ago(1d))
    | project TimeGenerated
    | summarize AvgPerSec = count() / 3600 by bin(TimeGenerated, 1h)
    | summarize round(avg(AvgPerSec))
    );
let summarizationexist  = (
    union isfuzzy=true 
        (
        WebSession_Summarized_SrcIP_CL
        | where EventTime_t > ago(1d) 
        | project v = int(2)
        ),
        (
        print int(1) 
        | project v = print_0
        )
    | summarize maxv = max(v)
    | extend sumexist = (maxv > 1)
    );
let TimeSeriesData = union isfuzzy=true 
        (
        (datatable(exists: int, sumexist: bool)[1, false]
        | where toscalar(eps) > 1000
        | join (summarizationexist) on sumexist)
        | join (
            _Im_WebSession(starttime=ago(2d), endtime=now())
            | project DstIpAddr, SrcBytes, TimeGenerated, EventProduct
            | where isnotempty(DstIpAddr)
                and not(ipv4_is_private(DstIpAddr))
                and isnotempty(SrcBytes)
            | summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)
            | extend EventTime = TimeGenerated, exists=int(1)
            )
            on exists
        | project-away exists*, maxv, sum*
        ),
        (
        (datatable(exists: int, sumexist: bool)[1, false]
        | where toscalar(eps) between (501 .. 1000)
        | join (summarizationexist) on sumexist)
        | join (
            _Im_WebSession(starttime=ago(3d), endtime=now())
            | project DstIpAddr, SrcBytes, TimeGenerated, EventProduct
            | where isnotempty(DstIpAddr)
                and not(ipv4_is_private(DstIpAddr))
                and isnotempty(SrcBytes)
            | summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)
            | extend EventTime = TimeGenerated, exists=int(1)
            )
            on exists
        | project-away exists*, maxv, sum*
        ),
        (
        (datatable(exists: int, sumexist: bool)[1, false]
        | where toscalar(eps) <= 500
        | join (summarizationexist) on sumexist)
        | join (
            _Im_WebSession(starttime=ago(4d), endtime=now())
            | project DstIpAddr, SrcBytes, TimeGenerated, EventProduct
            | where isnotempty(DstIpAddr)
                and not(ipv4_is_private(DstIpAddr))
                and isnotempty(SrcBytes)
            | summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)
            | extend EventTime = TimeGenerated, exists=int(1)
            )
            on exists
        | project-away exists*, maxv, sum*
        ),
        (
        WebSession_Summarized_SrcIP_CL
        | where EventTime_t between (ago(startTime) .. now())
        | where isnotempty(SrcBytes_d) and not(DstIPIsPrivate_b)
        | project
            SrcBytesSum=tolong(SrcBytes_d),
            EventTime=EventTime_t,
            EventProduct = EventProduct_s
        )
    | make-series TotalBytesSent = sum(SrcBytesSum) on EventTime from startofday(ago(startTime)) to startofday(now()) step timeframe by EventProduct;
// TimeSeriesData block ends here
//Take only anomalies in TimeSeriesData
let TimeSeriesAnomalies = materialize(TimeSeriesData
    | extend (anomalies, score, baseline) = series_decompose_anomalies(TotalBytesSent, scorethreshold, -1, 'linefit')
    | mv-expand
        TotalBytesSent to typeof(long),
        EventTime to typeof(datetime),
        anomalies to typeof(double),
        score to typeof(double),
        baseline to typeof(long)
    | where anomalies > 0 and baseline > 0
    | extend AnomalyHour = EventTime
    | extend
        TotalBytesSentinMBperHour = round(((TotalBytesSent / 1024) / 1024), 2),
        BaselineBytesSentinMBperHour = round(((baseline / 1024) / 1024), 2),
        score = round(score, 2)
    | project
        EventProduct,
        AnomalyHour,
        TotalBytesSentinMBperHour,
        BaselineBytesSentinMBperHour,
        anomalies,
        score
    | where AnomalyHour between (startofday(ago(endTime)) .. startofday(now())) // Get TimeSeriesAnomalies in previous day
        );
// TimeSeriesAlerts block end here
let AnomalyHours = materialize (TimeSeriesAnomalies
    | project AnomalyHour);
//Previous day aggregated per hour
let PreviousDayLogs = 
    _Im_WebSession(starttime=startofday(ago(endTime)), endtime=startofday(now()))
    | where isnotempty(DstIpAddr) and isnotempty(SrcIpAddr) and isnotempty(SrcBytes)
    | where not(ipv4_is_private(DstIpAddr))
    | project
        TimeGenerated,
        DstIpAddr,
        SrcIpAddr,
        SrcBytes,
        DstBytes,
        DstPortNumber,
        EventProduct
    | extend DateHour = bin(TimeGenerated, timeframe) // create a new column and round to hour
    | where DateHour in (AnomalyHours) // Filter dataset to include only anomaly AnomalyHours
    | extend
        SentBytesinMB = ((SrcBytes / 1024) / 1024),
        ReceivedBytesinMB = ((DstBytes / 1024) / 1024)
    | summarize
        HourlyCount = count(),
        TimeGeneratedMax = arg_max(TimeGenerated, *),
        DestinationIPList = make_set(DstIpAddr, 100),
        DestinationPortList = make_set(DstPortNumber, 100),
        SentBytesinMB = tolong(sum(SentBytesinMB)),
        ReceivedBytesinMB = tolong(sum(ReceivedBytesinMB))
        by SrcIpAddr, EventProduct, TimeGeneratedHour = bin(TimeGenerated, timeframe)
    | where SentBytesinMB > bytessentperhourthreshold
    | sort by TimeGeneratedHour asc, SentBytesinMB desc
    | extend Rank=row_number(1, prev(TimeGeneratedHour) != TimeGeneratedHour) // Ranking the dataset per Hourly Partition
    | where Rank <= 10  // Selecting Top 10 records with Highest BytesSent in each Hour
    | project
        EventProduct,
        TimeGeneratedHour,
        TimeGeneratedMax,
        SrcIpAddr,
        DestinationIPList,
        DestinationPortList,
        SentBytesinMB,
        ReceivedBytesinMB,
        Rank,
        HourlyCount;
// PreviousDayLogs block ends here
TimeSeriesAnomalies
| join kind = inner (PreviousDayLogs
    | extend AnomalyHour = TimeGeneratedHour)
    on EventProduct, AnomalyHour
| sort by score desc
| project
    EventProduct,
    AnomalyHour,
    TimeGeneratedMax,
    SrcIpAddr,
    DestinationIPList,
    DestinationPortList,
    SentBytesinMB,
    ReceivedBytesinMB,
    TotalBytesSentinMBperHour,
    BaselineBytesSentinMBperHour,
    score,
    anomalies,
    HourlyCount
| summarize
    EventCount = sum(HourlyCount),
    startTimeUtc = min(TimeGeneratedMax),
    EndTimeUtc = max(TimeGeneratedMax),
    SentBytesinMB = sum(SentBytesinMB),
    ReceivedBytesinMB = sum(ReceivedBytesinMB),
    SourceIP = take_any(SrcIpAddr),
    SourceIPList = make_set(SrcIpAddr, 10),
    DestinationIPList = make_set(DestinationIPList, 100),
    DestinationPortList = make_set(DestinationPortList, 100)
    by
    AnomalyHour,
    TotalBytesSentinMBperHour,
    BaselineBytesSentinMBperHour,
    score,
    anomalies,
    EventProduct
| project
    EventProduct,
    AnomalyHour,
    startTimeUtc,
    EndTimeUtc,
    SourceIP,
    SourceIPList,
    DestinationIPList,
    DestinationPortList,
    SentBytesinMB,
    ReceivedBytesinMB,
    TotalBytesSentinMBperHour,
    BaselineBytesSentinMBperHour,
    anomalies,
    score,
    EventCount
relevantTechniques:
- T1030
name: Detect unauthorized data transfers using timeseries anomaly (ASIM Web Session)
requiredDataConnectors: []
entityMappings:
- fieldMappings:
  - identifier: Address
    columnName: SourceIP
  entityType: IP
triggerThreshold: 0
id: 5965d3e7-8ed0-477c-9b42-e75d9237fab0
tactics:
- Exfiltration
version: 1.0.1
customDetails:
  DestinationPortList: DestinationPortList
  score: score
  DestinationIPList: DestinationIPList
  ReceivedBytesinMB: ReceivedBytesinMB
  anomalies: anomalies
  EventCount: EventCount
  SourceIPList: SourceIPList
  SentBytesinMB: SentBytesinMB
queryPeriod: 14d
alertDetailsOverride:
  alertDisplayNameFormat: IP address '{{SourceIP}}' is engaged in data transfers to a public network that exceeds usual levels
  alertDescriptionFormat: "Please conduct a thorough investigation of each IPAddresses listed in SourceIPList: '{{SourceIPList}}' to identify any suspicious activities that may require further investigation. 'SourceIPList' include the top 10 client IP addresses that transmitted the highest amount of data during the anomalous hour"
triggerOperator: gt
kind: Scheduled
tags:
- Schema: WebSession
  SchemaVersion: 0.2.6
eventGroupingSettings:
  aggregationKind: AlertPerResult
OriginalUri: https://github.com/Azure/Azure-Sentinel/blob/master/Solutions/Web Session Essentials/Analytic Rules/DataExfiltrationTimeSeriesAnomaly.yaml
queryFrequency: 1d
severity: Medium
status: Available
description: |
  'This query utilizes built-in KQL anomaly detection algorithms to identify anomalous data transfers to public networks. It detects significant deviations from a baseline pattern, allowing the detection of sudden increases in data transferred to unknown public networks, which may indicate data exfiltration attempts. Investigating such anomalies is crucial.
  The score indicates the degree to which the data transfer deviates from the baseline value. A higher score indicates a greater deviation. The query's output provides an aggregated summary view of the traffic observed in the flagged anomaly hour, including unique combinations of source IP addresses, destination IP addresses, and port bytes sent. It may be necessary to run queries for individual source IP addresses from the provided 'SourceIPlist' to identify any suspicious activity that warrants further investigation'  
query: |
  let startTime = 14d;
  let endTime = 1d;
  let timeframe = 1h;
  let scorethreshold = 5;
  let bytessentperhourthreshold = 10;
  // calculate avg. eps(events per second)
  let eps = materialize(_Im_WebSession(starttime=ago(1d))
      | project TimeGenerated
      | summarize AvgPerSec = count() / 3600 by bin(TimeGenerated, 1h)
      | summarize round(avg(AvgPerSec))
      );
  let summarizationexist  = (
      union isfuzzy=true 
          (
          WebSession_Summarized_SrcIP_CL
          | where EventTime_t > ago(1d) 
          | project v = int(2)
          ),
          (
          print int(1) 
          | project v = print_0
          )
      | summarize maxv = max(v)
      | extend sumexist = (maxv > 1)
      );
  let TimeSeriesData = union isfuzzy=true 
          (
          (datatable(exists: int, sumexist: bool)[1, false]
          | where toscalar(eps) > 1000
          | join (summarizationexist) on sumexist)
          | join (
              _Im_WebSession(starttime=ago(2d), endtime=now())
              | project DstIpAddr, SrcBytes, TimeGenerated, EventProduct
              | where isnotempty(DstIpAddr)
                  and not(ipv4_is_private(DstIpAddr))
                  and isnotempty(SrcBytes)
              | summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)
              | extend EventTime = TimeGenerated, exists=int(1)
              )
              on exists
          | project-away exists*, maxv, sum*
          ),
          (
          (datatable(exists: int, sumexist: bool)[1, false]
          | where toscalar(eps) between (501 .. 1000)
          | join (summarizationexist) on sumexist)
          | join (
              _Im_WebSession(starttime=ago(3d), endtime=now())
              | project DstIpAddr, SrcBytes, TimeGenerated, EventProduct
              | where isnotempty(DstIpAddr)
                  and not(ipv4_is_private(DstIpAddr))
                  and isnotempty(SrcBytes)
              | summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)
              | extend EventTime = TimeGenerated, exists=int(1)
              )
              on exists
          | project-away exists*, maxv, sum*
          ),
          (
          (datatable(exists: int, sumexist: bool)[1, false]
          | where toscalar(eps) <= 500
          | join (summarizationexist) on sumexist)
          | join (
              _Im_WebSession(starttime=ago(4d), endtime=now())
              | project DstIpAddr, SrcBytes, TimeGenerated, EventProduct
              | where isnotempty(DstIpAddr)
                  and not(ipv4_is_private(DstIpAddr))
                  and isnotempty(SrcBytes)
              | summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)
              | extend EventTime = TimeGenerated, exists=int(1)
              )
              on exists
          | project-away exists*, maxv, sum*
          ),
          (
          WebSession_Summarized_SrcIP_CL
          | where EventTime_t between (ago(startTime) .. now())
          | where isnotempty(SrcBytes_d) and not(DstIPIsPrivate_b)
          | project
              SrcBytesSum=tolong(SrcBytes_d),
              EventTime=EventTime_t,
              EventProduct = EventProduct_s
          )
      | make-series TotalBytesSent = sum(SrcBytesSum) on EventTime from startofday(ago(startTime)) to startofday(now()) step timeframe by EventProduct;
  // TimeSeriesData block ends here
  //Take only anomalies in TimeSeriesData
  let TimeSeriesAnomalies = materialize(TimeSeriesData
      | extend (anomalies, score, baseline) = series_decompose_anomalies(TotalBytesSent, scorethreshold, -1, 'linefit')
      | mv-expand
          TotalBytesSent to typeof(long),
          EventTime to typeof(datetime),
          anomalies to typeof(double),
          score to typeof(double),
          baseline to typeof(long)
      | where anomalies > 0 and baseline > 0
      | extend AnomalyHour = EventTime
      | extend
          TotalBytesSentinMBperHour = round(((TotalBytesSent / 1024) / 1024), 2),
          BaselineBytesSentinMBperHour = round(((baseline / 1024) / 1024), 2),
          score = round(score, 2)
      | project
          EventProduct,
          AnomalyHour,
          TotalBytesSentinMBperHour,
          BaselineBytesSentinMBperHour,
          anomalies,
          score
      | where AnomalyHour between (startofday(ago(endTime)) .. startofday(now())) // Get TimeSeriesAnomalies in previous day
          );
  // TimeSeriesAlerts block end here
  let AnomalyHours = materialize (TimeSeriesAnomalies
      | project AnomalyHour);
  //Previous day aggregated per hour
  let PreviousDayLogs = 
      _Im_WebSession(starttime=startofday(ago(endTime)), endtime=startofday(now()))
      | where isnotempty(DstIpAddr) and isnotempty(SrcIpAddr) and isnotempty(SrcBytes)
      | where not(ipv4_is_private(DstIpAddr))
      | project
          TimeGenerated,
          DstIpAddr,
          SrcIpAddr,
          SrcBytes,
          DstBytes,
          DstPortNumber,
          EventProduct
      | extend DateHour = bin(TimeGenerated, timeframe) // create a new column and round to hour
      | where DateHour in (AnomalyHours) // Filter dataset to include only anomaly AnomalyHours
      | extend
          SentBytesinMB = ((SrcBytes / 1024) / 1024),
          ReceivedBytesinMB = ((DstBytes / 1024) / 1024)
      | summarize
          HourlyCount = count(),
          TimeGeneratedMax = arg_max(TimeGenerated, *),
          DestinationIPList = make_set(DstIpAddr, 100),
          DestinationPortList = make_set(DstPortNumber, 100),
          SentBytesinMB = tolong(sum(SentBytesinMB)),
          ReceivedBytesinMB = tolong(sum(ReceivedBytesinMB))
          by SrcIpAddr, EventProduct, TimeGeneratedHour = bin(TimeGenerated, timeframe)
      | where SentBytesinMB > bytessentperhourthreshold
      | sort by TimeGeneratedHour asc, SentBytesinMB desc
      | extend Rank=row_number(1, prev(TimeGeneratedHour) != TimeGeneratedHour) // Ranking the dataset per Hourly Partition
      | where Rank <= 10  // Selecting Top 10 records with Highest BytesSent in each Hour
      | project
          EventProduct,
          TimeGeneratedHour,
          TimeGeneratedMax,
          SrcIpAddr,
          DestinationIPList,
          DestinationPortList,
          SentBytesinMB,
          ReceivedBytesinMB,
          Rank,
          HourlyCount;
  // PreviousDayLogs block ends here
  TimeSeriesAnomalies
  | join kind = inner (PreviousDayLogs
      | extend AnomalyHour = TimeGeneratedHour)
      on EventProduct, AnomalyHour
  | sort by score desc
  | project
      EventProduct,
      AnomalyHour,
      TimeGeneratedMax,
      SrcIpAddr,
      DestinationIPList,
      DestinationPortList,
      SentBytesinMB,
      ReceivedBytesinMB,
      TotalBytesSentinMBperHour,
      BaselineBytesSentinMBperHour,
      score,
      anomalies,
      HourlyCount
  | summarize
      EventCount = sum(HourlyCount),
      startTimeUtc = min(TimeGeneratedMax),
      EndTimeUtc = max(TimeGeneratedMax),
      SentBytesinMB = sum(SentBytesinMB),
      ReceivedBytesinMB = sum(ReceivedBytesinMB),
      SourceIP = take_any(SrcIpAddr),
      SourceIPList = make_set(SrcIpAddr, 10),
      DestinationIPList = make_set(DestinationIPList, 100),
      DestinationPortList = make_set(DestinationPortList, 100)
      by
      AnomalyHour,
      TotalBytesSentinMBperHour,
      BaselineBytesSentinMBperHour,
      score,
      anomalies,
      EventProduct
  | project
      EventProduct,
      AnomalyHour,
      startTimeUtc,
      EndTimeUtc,
      SourceIP,
      SourceIPList,
      DestinationIPList,
      DestinationPortList,
      SentBytesinMB,
      ReceivedBytesinMB,
      TotalBytesSentinMBperHour,
      BaselineBytesSentinMBperHour,
      anomalies,
      score,
      EventCount  
{
  "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
  "contentVersion": "1.0.0.0",
  "parameters": {
    "workspace": {
      "type": "String"
    }
  },
  "resources": [
    {
      "apiVersion": "2024-01-01-preview",
      "id": "[concat(resourceId('Microsoft.OperationalInsights/workspaces/providers', parameters('workspace'), 'Microsoft.SecurityInsights'),'/alertRules/5965d3e7-8ed0-477c-9b42-e75d9237fab0')]",
      "kind": "Scheduled",
      "name": "[concat(parameters('workspace'),'/Microsoft.SecurityInsights/5965d3e7-8ed0-477c-9b42-e75d9237fab0')]",
      "properties": {
        "alertDetailsOverride": {
          "alertDescriptionFormat": "Please conduct a thorough investigation of each IPAddresses listed in SourceIPList: '{{SourceIPList}}' to identify any suspicious activities that may require further investigation. 'SourceIPList' include the top 10 client IP addresses that transmitted the highest amount of data during the anomalous hour",
          "alertDisplayNameFormat": "IP address '{{SourceIP}}' is engaged in data transfers to a public network that exceeds usual levels"
        },
        "alertRuleTemplateName": "5965d3e7-8ed0-477c-9b42-e75d9237fab0",
        "customDetails": {
          "anomalies": "anomalies",
          "DestinationIPList": "DestinationIPList",
          "DestinationPortList": "DestinationPortList",
          "EventCount": "EventCount",
          "ReceivedBytesinMB": "ReceivedBytesinMB",
          "score": "score",
          "SentBytesinMB": "SentBytesinMB",
          "SourceIPList": "SourceIPList"
        },
        "description": "'This query utilizes built-in KQL anomaly detection algorithms to identify anomalous data transfers to public networks. It detects significant deviations from a baseline pattern, allowing the detection of sudden increases in data transferred to unknown public networks, which may indicate data exfiltration attempts. Investigating such anomalies is crucial.\nThe score indicates the degree to which the data transfer deviates from the baseline value. A higher score indicates a greater deviation. The query's output provides an aggregated summary view of the traffic observed in the flagged anomaly hour, including unique combinations of source IP addresses, destination IP addresses, and port bytes sent. It may be necessary to run queries for individual source IP addresses from the provided 'SourceIPlist' to identify any suspicious activity that warrants further investigation'\n",
        "displayName": "Detect unauthorized data transfers using timeseries anomaly (ASIM Web Session)",
        "enabled": true,
        "entityMappings": [
          {
            "entityType": "IP",
            "fieldMappings": [
              {
                "columnName": "SourceIP",
                "identifier": "Address"
              }
            ]
          }
        ],
        "eventGroupingSettings": {
          "aggregationKind": "AlertPerResult"
        },
        "OriginalUri": "https://github.com/Azure/Azure-Sentinel/blob/master/Solutions/Web Session Essentials/Analytic Rules/DataExfiltrationTimeSeriesAnomaly.yaml",
        "query": "let startTime = 14d;\nlet endTime = 1d;\nlet timeframe = 1h;\nlet scorethreshold = 5;\nlet bytessentperhourthreshold = 10;\n// calculate avg. eps(events per second)\nlet eps = materialize(_Im_WebSession(starttime=ago(1d))\n    | project TimeGenerated\n    | summarize AvgPerSec = count() / 3600 by bin(TimeGenerated, 1h)\n    | summarize round(avg(AvgPerSec))\n    );\nlet summarizationexist  = (\n    union isfuzzy=true \n        (\n        WebSession_Summarized_SrcIP_CL\n        | where EventTime_t > ago(1d) \n        | project v = int(2)\n        ),\n        (\n        print int(1) \n        | project v = print_0\n        )\n    | summarize maxv = max(v)\n    | extend sumexist = (maxv > 1)\n    );\nlet TimeSeriesData = union isfuzzy=true \n        (\n        (datatable(exists: int, sumexist: bool)[1, false]\n        | where toscalar(eps) > 1000\n        | join (summarizationexist) on sumexist)\n        | join (\n            _Im_WebSession(starttime=ago(2d), endtime=now())\n            | project DstIpAddr, SrcBytes, TimeGenerated, EventProduct\n            | where isnotempty(DstIpAddr)\n                and not(ipv4_is_private(DstIpAddr))\n                and isnotempty(SrcBytes)\n            | summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)\n            | extend EventTime = TimeGenerated, exists=int(1)\n            )\n            on exists\n        | project-away exists*, maxv, sum*\n        ),\n        (\n        (datatable(exists: int, sumexist: bool)[1, false]\n        | where toscalar(eps) between (501 .. 1000)\n        | join (summarizationexist) on sumexist)\n        | join (\n            _Im_WebSession(starttime=ago(3d), endtime=now())\n            | project DstIpAddr, SrcBytes, TimeGenerated, EventProduct\n            | where isnotempty(DstIpAddr)\n                and not(ipv4_is_private(DstIpAddr))\n                and isnotempty(SrcBytes)\n            | summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)\n            | extend EventTime = TimeGenerated, exists=int(1)\n            )\n            on exists\n        | project-away exists*, maxv, sum*\n        ),\n        (\n        (datatable(exists: int, sumexist: bool)[1, false]\n        | where toscalar(eps) <= 500\n        | join (summarizationexist) on sumexist)\n        | join (\n            _Im_WebSession(starttime=ago(4d), endtime=now())\n            | project DstIpAddr, SrcBytes, TimeGenerated, EventProduct\n            | where isnotempty(DstIpAddr)\n                and not(ipv4_is_private(DstIpAddr))\n                and isnotempty(SrcBytes)\n            | summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)\n            | extend EventTime = TimeGenerated, exists=int(1)\n            )\n            on exists\n        | project-away exists*, maxv, sum*\n        ),\n        (\n        WebSession_Summarized_SrcIP_CL\n        | where EventTime_t between (ago(startTime) .. now())\n        | where isnotempty(SrcBytes_d) and not(DstIPIsPrivate_b)\n        | project\n            SrcBytesSum=tolong(SrcBytes_d),\n            EventTime=EventTime_t,\n            EventProduct = EventProduct_s\n        )\n    | make-series TotalBytesSent = sum(SrcBytesSum) on EventTime from startofday(ago(startTime)) to startofday(now()) step timeframe by EventProduct;\n// TimeSeriesData block ends here\n//Take only anomalies in TimeSeriesData\nlet TimeSeriesAnomalies = materialize(TimeSeriesData\n    | extend (anomalies, score, baseline) = series_decompose_anomalies(TotalBytesSent, scorethreshold, -1, 'linefit')\n    | mv-expand\n        TotalBytesSent to typeof(long),\n        EventTime to typeof(datetime),\n        anomalies to typeof(double),\n        score to typeof(double),\n        baseline to typeof(long)\n    | where anomalies > 0 and baseline > 0\n    | extend AnomalyHour = EventTime\n    | extend\n        TotalBytesSentinMBperHour = round(((TotalBytesSent / 1024) / 1024), 2),\n        BaselineBytesSentinMBperHour = round(((baseline / 1024) / 1024), 2),\n        score = round(score, 2)\n    | project\n        EventProduct,\n        AnomalyHour,\n        TotalBytesSentinMBperHour,\n        BaselineBytesSentinMBperHour,\n        anomalies,\n        score\n    | where AnomalyHour between (startofday(ago(endTime)) .. startofday(now())) // Get TimeSeriesAnomalies in previous day\n        );\n// TimeSeriesAlerts block end here\nlet AnomalyHours = materialize (TimeSeriesAnomalies\n    | project AnomalyHour);\n//Previous day aggregated per hour\nlet PreviousDayLogs = \n    _Im_WebSession(starttime=startofday(ago(endTime)), endtime=startofday(now()))\n    | where isnotempty(DstIpAddr) and isnotempty(SrcIpAddr) and isnotempty(SrcBytes)\n    | where not(ipv4_is_private(DstIpAddr))\n    | project\n        TimeGenerated,\n        DstIpAddr,\n        SrcIpAddr,\n        SrcBytes,\n        DstBytes,\n        DstPortNumber,\n        EventProduct\n    | extend DateHour = bin(TimeGenerated, timeframe) // create a new column and round to hour\n    | where DateHour in (AnomalyHours) // Filter dataset to include only anomaly AnomalyHours\n    | extend\n        SentBytesinMB = ((SrcBytes / 1024) / 1024),\n        ReceivedBytesinMB = ((DstBytes / 1024) / 1024)\n    | summarize\n        HourlyCount = count(),\n        TimeGeneratedMax = arg_max(TimeGenerated, *),\n        DestinationIPList = make_set(DstIpAddr, 100),\n        DestinationPortList = make_set(DstPortNumber, 100),\n        SentBytesinMB = tolong(sum(SentBytesinMB)),\n        ReceivedBytesinMB = tolong(sum(ReceivedBytesinMB))\n        by SrcIpAddr, EventProduct, TimeGeneratedHour = bin(TimeGenerated, timeframe)\n    | where SentBytesinMB > bytessentperhourthreshold\n    | sort by TimeGeneratedHour asc, SentBytesinMB desc\n    | extend Rank=row_number(1, prev(TimeGeneratedHour) != TimeGeneratedHour) // Ranking the dataset per Hourly Partition\n    | where Rank <= 10  // Selecting Top 10 records with Highest BytesSent in each Hour\n    | project\n        EventProduct,\n        TimeGeneratedHour,\n        TimeGeneratedMax,\n        SrcIpAddr,\n        DestinationIPList,\n        DestinationPortList,\n        SentBytesinMB,\n        ReceivedBytesinMB,\n        Rank,\n        HourlyCount;\n// PreviousDayLogs block ends here\nTimeSeriesAnomalies\n| join kind = inner (PreviousDayLogs\n    | extend AnomalyHour = TimeGeneratedHour)\n    on EventProduct, AnomalyHour\n| sort by score desc\n| project\n    EventProduct,\n    AnomalyHour,\n    TimeGeneratedMax,\n    SrcIpAddr,\n    DestinationIPList,\n    DestinationPortList,\n    SentBytesinMB,\n    ReceivedBytesinMB,\n    TotalBytesSentinMBperHour,\n    BaselineBytesSentinMBperHour,\n    score,\n    anomalies,\n    HourlyCount\n| summarize\n    EventCount = sum(HourlyCount),\n    startTimeUtc = min(TimeGeneratedMax),\n    EndTimeUtc = max(TimeGeneratedMax),\n    SentBytesinMB = sum(SentBytesinMB),\n    ReceivedBytesinMB = sum(ReceivedBytesinMB),\n    SourceIP = take_any(SrcIpAddr),\n    SourceIPList = make_set(SrcIpAddr, 10),\n    DestinationIPList = make_set(DestinationIPList, 100),\n    DestinationPortList = make_set(DestinationPortList, 100)\n    by\n    AnomalyHour,\n    TotalBytesSentinMBperHour,\n    BaselineBytesSentinMBperHour,\n    score,\n    anomalies,\n    EventProduct\n| project\n    EventProduct,\n    AnomalyHour,\n    startTimeUtc,\n    EndTimeUtc,\n    SourceIP,\n    SourceIPList,\n    DestinationIPList,\n    DestinationPortList,\n    SentBytesinMB,\n    ReceivedBytesinMB,\n    TotalBytesSentinMBperHour,\n    BaselineBytesSentinMBperHour,\n    anomalies,\n    score,\n    EventCount\n",
        "queryFrequency": "P1D",
        "queryPeriod": "P14D",
        "severity": "Medium",
        "status": "Available",
        "subTechniques": [],
        "suppressionDuration": "PT1H",
        "suppressionEnabled": false,
        "tactics": [
          "Exfiltration"
        ],
        "tags": [
          {
            "Schema": "WebSession",
            "SchemaVersion": "0.2.6"
          }
        ],
        "techniques": [
          "T1030"
        ],
        "templateVersion": "1.0.1",
        "triggerOperator": "GreaterThan",
        "triggerThreshold": 0
      },
      "type": "Microsoft.OperationalInsights/workspaces/providers/alertRules"
    }
  ]
}