On this publish, we present how one can use Amazon Kinesis Information Streams to buffer and combination real-time streaming knowledge for supply into Amazon OpenSearch Service domains and collections utilizing Amazon OpenSearch Ingestion. You need to use this strategy for a wide range of use instances, from real-time log analytics to integrating software messaging knowledge for real-time search. On this publish, we give attention to the use case for centralizing log aggregation for a company that has a compliance have to archive and retain its log knowledge.
Kinesis Information Streams is a completely managed, serverless knowledge streaming service that shops and ingests varied streaming knowledge in actual time at any scale. For log analytics use instances, Kinesis Information Streams enhances log aggregation by decoupling producer and shopper functions, and offering a resilient, scalable buffer to seize and serve log knowledge. This decoupling gives benefits over conventional architectures. As log producers scale up and down, Kinesis Information Streams might be scaled dynamically to persistently buffer log knowledge. This prevents load adjustments from impacting an OpenSearch Service area, and gives a resilient retailer of log knowledge for consumption. It additionally permits for a number of shoppers to course of log knowledge in actual time, offering a persistent retailer of real-time knowledge for functions to eat. This enables the log analytics pipeline to satisfy Properly-Architected greatest practices for resilience (REL04-BP02) and price (COST09-BP02).
OpenSearch Ingestion is a serverless pipeline that gives highly effective instruments for extracting, reworking, and loading knowledge into an OpenSearch Service area. OpenSearch Ingestion integrates with many AWS companies, and gives ready-made blueprints to speed up ingesting knowledge for a wide range of analytics use instances into OpenSearch Service domains. When paired with Kinesis Information Streams, OpenSearch Ingestion permits for stylish real-time analytics of knowledge, and helps scale back the undifferentiated heavy lifting of making a real-time search and analytics structure.
Resolution overview
On this answer, we take into account a typical use case for centralized log aggregation for a company. Organizations would possibly take into account a centralized log aggregation strategy for a wide range of causes. Many organizations have compliance and governance necessities which have stipulations for what knowledge must be logged, and the way lengthy log knowledge should be retained and stay searchable for investigations. Different organizations search to consolidate software and safety operations, and supply frequent observability toolsets and capabilities throughout their groups.
To satisfy such necessities, that you must accumulate knowledge from log sources (producers) in a scalable, resilient, and cost-effective method. Log sources could differ between software and infrastructure use instances and configurations, as illustrated within the following desk.
Log Producer | Instance | Instance Producer Log Configuration |
Software Logs | AWS Lambda | Amazon CloudWatch Logs |
Software Brokers | FluentBit | Amazon OpenSearch Ingestion |
AWS Service Logs | Amazon Net Software Firewall | Amazon S3 |
The next diagram illustrates an instance structure.
You need to use Kinesis Information Streams for a wide range of these use instances. You possibly can configure Amazon CloudWatch logs to ship knowledge to Kinesis Information Streams utilizing a subscription filter (see Actual-time processing of log knowledge with subscriptions). Should you ship knowledge with Kinesis Information Streams for analytics use instances, you should use OpenSearch Ingestion to create a scalable, extensible pipeline to eat your streaming knowledge and write it to OpenSearch Service indexes. Kinesis Information Streams gives a buffer that may assist a number of shoppers, configurable retention, and built-in integration with a wide range of AWS companies. For different use instances the place knowledge is saved in Amazon Easy Storage Service (Amazon S3), or the place an agent writes knowledge resembling FluentBit, an agent can write knowledge on to OpenSearch Ingestion with out an intermediate buffer because of OpenSearch Ingestion’s built-in persistent buffers and automated scaling.
Standardizing logging approaches reduces growth and operational overhead for organizations. For instance, you would possibly standardize on all functions logging to CloudWatch logs when possible, and in addition deal with Amazon S3 logs the place CloudWatch logs are unsupported. This reduces the variety of use instances {that a} centralized workforce must deal with of their log aggregation strategy, and reduces the complexity of the log aggregation answer. For extra subtle growth groups, you would possibly standardize on utilizing FluentBit brokers to put in writing knowledge on to OpenSearch Ingestion to decrease value when log knowledge doesn’t should be saved in CloudWatch.
This answer focuses on utilizing CloudWatch logs as a knowledge supply for log aggregation. For the Amazon S3 log use case, see Utilizing an OpenSearch Ingestion pipeline with Amazon S3. For agent-based options, see the agent-specific documentation for integration with OpenSearch Ingestion, resembling Utilizing an OpenSearch Ingestion pipeline with Fluent Bit.
Conditions
A number of key items of infrastructure used on this answer are required to ingest knowledge into OpenSearch Service with OpenSearch Ingestion:
- A Kinesis knowledge stream to combination the log knowledge from CloudWatch
- An OpenSearch area to retailer the log knowledge
When creating the Kinesis knowledge stream, we advocate beginning with On-Demand mode. This may permit Kinesis Information Streams to robotically scale the variety of shards wanted to your log throughput. After you determine the regular state workload to your log aggregation use case, we advocate transferring to Provisioned mode, utilizing the variety of shards recognized in On-Demand mode. This may also help you optimize long-term value for high-throughput use instances.
Typically, we advocate utilizing one Kinesis knowledge stream to your log aggregation workload. OpenSearch Ingestion helps as much as 96 OCUs per pipeline, and 24,000 characters per pipeline definition file (see OpenSearch Ingestion quotas). Because of this every pipeline can assist a Kinesis knowledge stream with as much as 96 shards, as a result of every OCU processes one shard. Utilizing one Kinesis knowledge stream simplifies the general course of to combination log knowledge into OpenSearch Service, and simplifies the method for creating and managing subscription filters for log teams.
Relying on the dimensions of your log workloads, and the complexity of your OpenSearch Ingestion pipeline logic, you could take into account extra Kinesis knowledge streams to your use case. For instance, you could take into account one stream for every main log sort in your manufacturing workload. Having log knowledge for various use instances separated into totally different streams may also help scale back the operational complexity of managing OpenSearch Ingestion pipelines, and lets you scale and deploy adjustments to every log use case individually when required.
To create a Kinesis Information Stream, see Create a knowledge stream.
To create an OpenSearch area, see Creating and managing Amazon OpenSearch domains.
Configure log subscription filters
You possibly can implement CloudWatch log group subscription filters on the account stage or log group stage. In each instances, we advocate making a subscription filter with a random distribution methodology to verify log knowledge is evenly distributed throughout Kinesis knowledge stream shards.
Account-level subscription filters are utilized to all log teams in an account, and can be utilized to subscribe all log knowledge to a single vacation spot. This works properly if you wish to retailer all of your log knowledge in OpenSearch Service utilizing Kinesis Information Streams. There’s a restrict of 1 account-level subscription filter per account. Utilizing Kinesis Information Streams because the vacation spot additionally lets you have a number of log shoppers to course of the account log knowledge when related. To create an account-level subscription filter, see Account-level subscription filters.
Log group-level subscription filters are utilized on every log group. This strategy works properly if you wish to retailer a subset of your log knowledge in OpenSearch Service utilizing Kinesis Information Streams, and if you wish to use a number of totally different knowledge streams to retailer and course of a number of log varieties. There’s a restrict of two log group-level subscription filters per log group. To create a log group-level subscription filter, see Log group-level subscription filters.
After you create your subscription filter, confirm that log knowledge is being despatched to your Kinesis knowledge stream. On the Kinesis Information Streams console, select the hyperlink to your stream title.
Select a shard with Beginning place set as Trim horizon, and select Get information.
It’s best to see information with a singular Partition key column worth and binary Information column. It’s because CloudWatch sends knowledge in .gzip format to compress log knowledge.
Configure an OpenSearch Ingestion pipeline
Now that you’ve a Kinesis knowledge stream and CloudWatch subscription filters to ship knowledge to the information stream, you’ll be able to configure your OpenSearch Ingestion pipeline to course of your log knowledge. To start, you create an AWS Identification and Entry Administration (IAM) function that permits learn entry to the Kinesis knowledge stream and browse/write entry to the OpenSearch area. To create your pipeline, your supervisor function that’s used to create the pipeline would require iam:PassRole
permissions to the pipeline function created on this step.
- Create an IAM function with the next permissions to learn out of your Kinesis knowledge stream and entry your OpenSearch area:
- Give your function a belief coverage that permits entry from osis-pipelines.amazonaws.com:
For a pipeline to put in writing knowledge to a website, the area will need to have a domain-level entry coverage that permits the pipeline function to entry it, and in case your area makes use of fine-grained entry management, then the IAM function must be mapped to a backend function within the OpenSearch Service safety plugin that permits entry to create and write to indexes.
- After you create your pipeline function, on the OpenSearch Service console, select Pipelines beneath Ingestion within the navigation pane.
- Select Create pipeline.
- Seek for Kinesis within the blueprints, choose the Kinesis Information Streams blueprint, and select Choose blueprint.
- Below Pipeline settings, enter a reputation to your pipeline, and set Max capability for the pipeline to be equal to the variety of shards in your Kinesis knowledge stream.
Should you’re utilizing On-Demand mode for the information stream, select a capability equal to the present variety of shards within the stream. This use case doesn’t require a persistent buffer, as a result of Kinesis Information Streams gives a persistent buffer for the log knowledge, and OpenSearch Ingestion tracks its place within the Kinesis knowledge stream over time, stopping knowledge loss on restarts.
- Below Pipeline configuration, replace the pipeline supply settings to make use of your Kinesis knowledge stream title and pipeline IAM function Amazon Useful resource Title (ARN).
For full configuration data, see . For many configurations, you should use the default values. By default, the pipeline will write batches of 100 paperwork each 1 second, and can subscribe to the Kinesis knowledge stream from the most recent place within the stream utilizing enhanced fan-out, checkpointing its place within the stream each 2 minutes. You possibly can regulate this conduct as desired to tune how often the buyer checkpoints, the place it begins within the stream, and use polling to cut back prices from enhanced fan-out.
- Replace the pipeline sink settings to incorporate your OpenSearch area endpoint URL and pipeline IAM function ARN.
The IAM function ARN should be the identical for each the OpenSearch Servicer sink definition and the Kinesis Information Streams supply definition. You possibly can management what knowledge will get listed in several indexes utilizing the index definition within the sink. For instance, you should use metadata in regards to the Kinesis knowledge stream title to index by knowledge stream (${getMetadata("kinesis_stream_name")
), or you should use doc fields to index knowledge relying on the CloudWatch log group or different doc knowledge (${path/to/discipline/in/doc}
). On this instance, we use three document-level fields (data_stream.sort
, data_stream.dataset
, and data_stream.namespace
) to index our paperwork, and create these fields in our pipeline processor logic within the subsequent part:
Lastly, you’ll be able to replace the pipeline configuration to incorporate processor definitions to rework your log knowledge earlier than writing paperwork to the OpenSearch area. For instance, this use case adopts Easy Schema for Observability (SS4O) and makes use of the OpenSearch Ingestion pipeline to create the specified schema for SS4O. This consists of including frequent fields to affiliate metadata with the listed paperwork, in addition to parsing the log knowledge to make knowledge extra searchable. This use case additionally makes use of the log group title to determine totally different log varieties as datasets, and makes use of this data to put in writing paperwork to totally different indexes relying on their use instances.
- Rename the CloudWatch occasion timestamp to mark the noticed timestamp when the log was generated utilizing the rename_keys processor, and add the present timestamp because the processed timestamp when OpenSearch Ingestion dealt with the document utilizing the date processor:
- Use the add_entries processor to incorporate metadata in regards to the processed doc, together with the log group, log stream, account ID, AWS Area, Kinesis knowledge stream data, and dataset metadata:
- Use conditional expression syntax to replace the
data_stream.dataset
fields relying on the log supply, to manage what index the doc is written to, and use the delete_entries processor to delete the unique CloudWatch doc fields that have been renamed: - Parse the log message fields to permit structured and JSON knowledge to be extra searchable within the OpenSearch indexes utilizing the grok and parse_json
Grok processors use sample matching to parse knowledge from structured textual content fields. For examples of built-in Grok patterns, see java-grok patterns and dataprepper grok patterns.
When it’s all put collectively, your pipeline configuration will seem like the next code:
- When your configuration is full, select Validate pipeline to examine your pipeline syntax for errors.
- Within the Pipeline function part, optionally enter a suffix to create a singular service function that will probably be used to begin your pipeline run.
- Within the Community part, choose VPC entry.
For a Kinesis Information Streams supply, you don’t want to pick out a digital non-public cloud (VPC), subnets, or safety teams. OpenSearch Ingestion solely requires these attributes for HTTP knowledge sources which are situated inside a VPC. For Kinesis Information Streams, OpenSearch Ingestion makes use of AWS PrivateLink to learn from Kinesis Information Streams and write to OpenSearch domains or serverless collections.
- Optionally, allow CloudWatch logging to your pipeline.
- Select Subsequent to evaluation and create your pipeline.
Should you’re utilizing account-level subscription filters for CloudWatch logs within the account the place OpenSearch Ingestion is operating, this log group ought to be excluded from the account-level subscription. It’s because OpenSearch Ingestion pipeline logs may trigger a recursive loop with the subscription filter that might result in excessive volumes of log knowledge ingestion and price.
- Within the Evaluation and create part, select Create pipeline.
When your pipeline enters the Lively state, you’ll see logs start to populate in your OpenSearch area or serverless assortment.
Monitor the answer
To keep up the well being of the log ingestion pipeline, there are a number of key areas to observe:
- Kinesis Information Streams metrics – It’s best to monitor the next metrics:
- FailedRecords – Signifies a difficulty in CloudWatch subscription filters writing to the Kinesis knowledge stream. Attain out to AWS Assist if this metric stays at a non-zero stage for a sustained interval.
- ThrottledRecords – Signifies your Kinesis knowledge stream wants extra shards to accommodate the log quantity from CloudWatch.
- ReadProvisionedThroughputExceeded – Signifies your Kinesis knowledge stream has extra shoppers consuming learn throughput than equipped by the shard limits, and you could want to maneuver to an enhanced fan-out shopper technique.
- WriteProvisionedThroughputExceeded – Signifies your Kinesis knowledge stream wants extra shards to accommodate the log quantity from CloudWatch, or that your log quantity is being erratically distributed to your shards. Be sure that the subscription filter distribution technique is about to random, and take into account enabling enhanced shard-level monitoring on the information stream to determine sizzling shards.
- RateExceeded – Signifies {that a} shopper is incorrectly configured for the stream, and there could also be a difficulty in your OpenSearch Ingestion pipeline inflicting it to subscribe too typically. Examine your shopper technique for the Kinesis knowledge stream.
- MillisBehindLatest – Signifies the improved fan-out shopper isn’t maintaining with the load within the knowledge stream. Examine the OpenSearch Ingestion pipeline OCU configuration and ensure there are ample OCUs to accommodate the Kinesis knowledge stream shards.
- IteratorAgeMilliseconds – Signifies the polling shopper isn’t maintaining with the load within the knowledge stream. Examine the OpenSearch Ingestion pipeline OCU configuration and ensure there are ample OCUs to accommodate the Kinesis knowledge stream shards, and examine the polling technique for the buyer.
- CloudWatch subscription filter metrics – It’s best to monitor the next metrics:
- DeliveryErrors – Signifies a difficulty in CloudWatch subscription filter delivering knowledge to the Kinesis knowledge stream. Examine knowledge stream metrics.
- DeliveryThrottling – Signifies inadequate capability within the Kinesis knowledge stream. Examine knowledge stream metrics.
- OpenSearch Ingestion metrics – For beneficial monitoring for OpenSearch Ingestion, see Really useful CloudWatch alarms.
- OpenSearch Service metrics – For beneficial monitoring for OpenSearch Service, see Really useful CloudWatch alarms for Amazon OpenSearch Service.
Clear up
Be sure you clear up undesirable AWS sources created whereas following this publish so as to stop further billing for these sources. Comply with these steps to wash up your AWS account:
- Delete your Kinesis knowledge stream.
- Delete your OpenSearch Service area.
- Use the DeleteAccountPolicy API to take away your account-level CloudWatch subscription filter.
- Delete your log group-level CloudWatch subscription filter:
- On the CloudWatch console, choose the specified log group.
- On the Actions menu, select Subscription Filters and Delete all subscription filter(s).
- Delete the OpenSearch Ingestion pipeline.
Conclusion
On this publish, you discovered how one can create a serverless ingestion pipeline to ship CloudWatch logs in actual time to an OpenSearch area or serverless assortment utilizing OpenSearch Ingestion. You need to use this strategy for a wide range of real-time knowledge ingestion use instances, and add it to current workloads that use Kinesis Information Streams for real-time knowledge analytics.
For different use instances for OpenSearch Ingestion and Kinesis Information Streams, take into account the next:
To proceed bettering your log analytics use instances in OpenSearch, think about using among the pre-built dashboards out there in Integrations in OpenSearch Dashboards.
Concerning the authors
M Mehrtens has been working in distributed techniques engineering all through their profession, working as a Software program Engineer, Architect, and Information Engineer. Up to now, M has supported and constructed techniques to course of terrabytes of streaming knowledge at low latency, run enterprise Machine Studying pipelines, and created techniques to share knowledge throughout groups seamlessly with various knowledge toolsets and software program stacks. At AWS, they’re a Sr. Options Architect supporting US Federal Monetary prospects.
Arjun Nambiar is a Product Supervisor with Amazon OpenSearch Service. He focuses on ingestion applied sciences that allow ingesting knowledge from all kinds of sources into Amazon OpenSearch Service at scale. Arjun is serious about large-scale distributed techniques and cloud-centered applied sciences, and is predicated out of Seattle, Washington.
Muthu Pitchaimani is a Search Specialist with Amazon OpenSearch Service. He builds large-scale search functions and options. Muthu is within the subjects of networking and safety, and is predicated out of Austin, Texas.