Introduction
In the recent years, a new field has been developing in the world of computer science. As more and more internet connected devices, called the internet of things or IoT for short, become available, so does the demand for systems to handle and process the data from those devices. With the rapid expansion it is evident that more advanced systems need to be put in place to support such an environment. However, from the sheer scale at which these systems are coming online, more and more complex systems need to be put in place. However as this is a very recent development in the IT world, such systems are actively being developed and there is an emerging need to make the process simpler every step along the way as this is no longer a niche market but is becoming common place for every industry in the world. Businesses and individuals alike want to gather all the data they possibly can in order to extract meaningful information and additional value from it. The central dogma of IoT is to combine computers and networks to monitor and manage our systems. With that in mind, we can understand that with the advances in electronics and electronic devices, that people now want to measure everything from their blood sugar levels or other vital parameters in real time to businesses tracing equipment performance and utilization at scale. Handling such volumes of data is known as Big Data which is based on the principles of data aggregation, storage, analysis and presentation. As the backbone of such systems is hardware that needs to be easily accessible, cost effective and simple to use. In the world of computer science and IT, abstraction is a powerful concept where complex issues are wrapped up with only the necessary being exposed to the user which can then combine these new and larger building blocks to make larger concepts materialize. Once such recent development is the abstraction of data centers which no longer offer only hardware and direct access to physical machines but abstract it and deliver it as simple managed packages which can be used. This is now known as cloud computing.
With abstraction as a core concept of simplifying complex systems which is then reused to create larger and more complex systems, it can be understood that with the emerging concept of IoT and Big Data, new systems need to be put in place to allow further growth and innovation. As big data, cloud computing and IoT are still relatively new, such systems are still lacking in this area and need to be developed, all while demand is strongly growing. The current problem is lack of generalized solutions to tackle these new requirements, and while foundational elements do exist both in terms of hardware and software infrastructure, there is still a lack of abstraction for complete end to end IoT solutions which are to be used to create higher order systems.
What the thesis is proposing is building a solution that encompases all the essential requirements of IoT as a generalized system which can be further coupled to fit various use cases that can be made with IoT. The data being generated by such devices is called telemetry and is generally understood to be small measurements of key parameters that are being tracked which are collected over time and collected in a central system from all the devices employed which can than be further processed to extract valuable information. One of the core aspects of this is the timeliness of information and in the world of IoT real time feedback is desired from the stream of incoming data to be able to react on time. As such a system for data ingestion, processing, storage and analytics as well as management is required, all while being flexible for further extension and use.
Ingestion on its own serves a twofold role in IoT as one requirement is pure aggregation of data, the other is the need for between device communication, also known as machine to machine or M2M communication. Currently the most widely accepted interface is MQTT as a standardized protocol. MQTT is a very lightweight protocol aimed to reduce the overall complexity and data overhead for device communication especially with power and data restricted environments. The second part of data ingestion is the centralized hub that needs to be highly scalable and fault tolerant in order to guarantee data delivery in large volumes.
The second key component of an IoT system is processing the data that is ingested in real time or very close, also known as near real time. This process includes parsing, refining, aggregating and structuring the data to be stored which can then be used for analysis and visualization to infer new information and knowledge. This has been a problem in the Big Data industry for a while and has been tackled with several approaches. The problem with addressing constant streaming data while expecting real time results is very difficult which will be explained later in more detail. In essence it boils down to a trade off between correctness and timeliness of the results.
Storage on its own might seem simple, however handling data at scale requires advanced approaches in both hardware resources in terms of provisioning resources and safe storage and management which is nowadays mostly abstracted by cloud providers, but also has logical requirements in terms of cost effectiveness and data organization to be efficient for retrieval and analysis later on. Depending on the use case this problem has now been largely solved and efficient and safe methods for storing and processing large amounts of data exist as it has been the underpinning of Big Data which was a prerequisite for the many segments and industries as well as IoT.
The last piece of an IoT system is data presentation and analysis, especially at scale. While general approaches already exist, this is still an expanding field as large scale analytics extend beyond simple hardware and logical constraints and more cost and time effective methods are required all the time. A large portion of analytics is based on the data organization and how it is stored, but also highly depends on efficient processing systems being coupled with it and finally properly visualised. Also the recent advent of artificial intelligence and machine learning further emphasizes this part. The constant push for more efficient processing has brought substantial improvements in the last years both in software, but also pushing hardware manufacturers to provide better optimized solutions.
When all these segments are put together it is possible to create a scalable and efficient solution for IoT as a telemetry aggregation and analysis platform which can handle thousands and millions of devices constantly pushing data and measurements to a centralized hub where it can be refined and analyzed to produce extraordinary insights and provide significant value both in operational efficiency and business intelligence, but can also serve as a platform for further development.
Aims and Objectives
The aim of this thesis is to show the design and implementation process of a highly scalable and fault resilient telemetry aggregation and processing platform, tailored towards the internet of things. The purpose is to show the inner workings of such systems and implement them using novel technologies and to increase performance while driving down cost. The core objectives are:
- A highly scalable, performant ingestion mechanism for data
- Real time data processing and aggregation
- Cold and hot storage of data and organization for cost and performance effective analysis
- Front end for data exploration and management
- A system aiding software development and system operations
- Key metrics as to why this implementation is an improvement on the current state
Hypothesis
There is a lack of high performance cost effective solutions for collecting, persisting and analyzing telemetry data, especially in the field of IoT whose backbone part is the data collection process. It is hypothesized that the collection and storage of data can be done much more efficiently and cost effectively while increasing the resolution of the data collected compared to current solutions.
The key attributes for different providers are the following:
- Number of metrics
- Number of clients
- Message throughput (Bandwidth)
- Data resolution
- Data retention
Preliminary research shows that the most constrained attributes are message throughput and data resolution, followed by data retention as the core attributes. Number of metrics and clients are mostly derived constraints that are there to indirectly influence throughput.
To formulate this as a hypothesis question, can we address data ingestion, processing and storage so that it is much more efficient and cost effective than current solutions thus lowering the barrier to entry and improving the usability of such systems?
Literature Review
According to a study done by Cisco, their report forecasts that by year 2020 they expect the average person to have 6.58 connected devices with an estimated 7.6 billion people on the planet by which they estimate that there will be over 50 billion internet connected devices [1]. Currently it is estimated that there are over 25 billion connected devices. One of the keys to such explosive growth is the technical support and infrastructure for those devices and systems. Cisco as a leader in the networking industry states that the key underpinning of those devices are networks that support such devices. What is important is to note that such devices require simple yet effective network implementations, both from a hardware and software standpoint.
In the report from the Internet Society, it is evident that besides infrastructural difficulties that need to be overcome, there are also security, regulatory and interoperability issues that need to be addressed. As an emerging market these points still rest on the individual implementations of the IoT devices and the systems they employ. Security needs to be a key point as the purpose of IoT is gathering massive amounts of data which can be analyzed to provide insightful details that would otherwise go unnoticed. However, privacy is a big issue as we do not want personal data to leak to third parties or that the data is used with malicious intent. Secondly, more legal frameworks need to be introduced on who owns the data and how it can be used, as well as measures to enforce such regulation. Lastly there is the technical aspect that needs to be addressed in terms of standardization and interoperability concerns. Currently there are several informal standards bodies which aim to aid the cause of interoperability, however no legal body exists that manages those standards. Most standards today are being developed by individual groups which are then adopted by the market [2].
The Internet Society also the general idea of IoT as a concept of combining computers and networks to monitor and control devices. And while the general idea has been known and used for decades, it is only now that is it becoming an important point to address as it scales beyond the capabilities of simple systems. The report names several use cases as an example of the possible applications of IoT, some of which are in the human body tracking various vital and health parameters and reporting back in real time, at home where we control and measure appliances and their usage, retail – from storage to checkout by tracking items and customers to bring the best experiences, all the way to industrial applications like oil rig and power plant monitoring as well as high precision applications such as equipment and operational efficiency tracking in factories.
As the data volumes are rapidly increasing with the explosive growth of internet connected devices, it is important to be able to aggregate and process the data in a meaningful way. To handle this, new systems need to be applied to what is now known as Big Data, a term used to describe vast amounts of both structured and unstructured data which is then refined and processed using approaches from Big Data Analytics to get a deeper understanding and extract intelligence from the singular data points recorded. In the report from SAS, it states that for such a system there are three important considerations. First is data management for big data, as data is now treated as an asset it needs to be handled appropriately and based on its scale, special systems need to be employed as a single personal computer is no longer capable of handling such scale of data storage. Secondly, there is the performance aspect of using such data. The purpose of IoT at best is to provide real time insights into the operations we are tracking and measuring and allowing for appropriate responses. Such requirements extend beyond collecting data and storing it offline for later processing where we can spend many hours of processing to analyze a small batch of events to uncover additional meaning from that time frame. Now we need to see the data in real time and to provide insight as soon as possible. Lastly there is the factor of management and deployment. As there are numerous sensors being constantly employed, it is no longer viable to do “one off” setups which we will use later, but need a more generalized approach to allow for rapid expansion and inclusion of new units within the system to expand the reach and capabilities [3].
With the demand for systems that extend beyond the capabilities of a single computer or server, the global market as responded with a supply of data centers which allow for individuals and companies to rent hardware equipment for their needs. As the demand increased and the requirement of the customers increased, so did the offered services expand and classic data centers moved to more complex systems. Nowadays, such systems are all grouped under an umbrella name called “Cloud Computing”. What makes up cloud computing is that beyond hardware being rented, they offer additional services on top of that to make the usage easier but also to expand services beyond just renting physical machines. Leading cloud providers such as Google with its Cloud Platform, Amazon with AWS and Microsoft with Azure and many others, now offer services such as on demand computing power which is flexible in terms of cpu power, memory available and disk storage provided, but they also include services for managed environments which are abstracted computing units that don’t need to be set up or managed and can be used as is from the start, elastic storage provisioning where available disk storage expands based on the current usage, automatic scaling based on usage load and many more such as machine learning interfaces. Beyond offering services which are now managed by those companies, they also provide service agreements where they guarantee certain uptime and performance figures which helps develop scalable and performant systems that can handle the ever increasing loads in a reliable and cost effective manner [4].
Once we dive into more implementation details it is important to understand the individual segments into more detail. Evaluated and discussed in the system design and development process sections we can see the details of such system and the architectural choices being made. One important decision besides implementation details is the deployment and management details of such a large scale system. Kubernetes is a well known tool used for container orchestrations which is simply put an abstraction layer and management platform for provisioning and configuring server instances based on pre set rules which allow us to deploy systems at scale with very little operations overhead using containerized applications such as with Docker [5].
A key part of any telemetry system is the ability to ingest and communicate data over to the system. The industry standard solution is to use MQTT as a messaging protocol due to its light weight and adaptability, but also since it is well suited not only for a client server architecture but also for M2M communication [6]. However an ingestion system is not complete by only accepting inbound messages but relies on proper handling of data coming in at such scale with reliability and high throughput. A common system employed for this task is Kafka, a tool that originated from LinkedIn’s internal needs for such a system and then being published as an open source project as a contribution to the Apache Foundation like many other tools that are used throughout this project. It is a tool made for large scale work log and messaging queues that are durable and persistent [7].
The second part of every IoT platform is the processing and analysis system which is generally built as a lambda architecture to fit the needs of both correctness and timeliness of the results [8]. However recently, a new development has been made in the field allowing a single processing pipeline to handle both batch and streaming data, addressing both issues, while simplifying the overall system. The projects was contributed from Google named Dataflow and is now under the Apache Foundation as project BEAM [9].
The final part of a telemetry platform is storing the data and being able to analyze it. Considering all data coming in is some sort of time series data we can focus on databases providing solutions for those use cases. The key points are handling large volumes of data while being cost effective. Facebook, for its own needs, developed such a database to be able to handle massive amounts of data while providing significant analytical performance. They open sourced the project which is now also hosted and developed under the Apache Foundation as the Cassandra database [10]. Another development and one of the key underpinnings of big data and analytics is Google’s work on BigTable and Dremel which are the basis of many big data processing tools such as Hadoop, Spark, Flink, HBase and many other tools which are now widely used for large scale data storage and processing [11,12, 13].
Problem Description
A telemetry platform for the internet of things is a very complex system that needs to address several key points. For a complete pipeline the telemetry system needs to support a mechanism of reliable data ingestion at scale, a data processing system that will structure the data and do further analysis on the data in real time and finally persist the data reliably and in a form that can be analysed later. Another key component is making the data explorable and available to the end users. Each of these subtopics needs to be highly scalable, as fault tolerant as possible and as close as possible to real time when delivering results. As each of these components is very complex on its own thus each of the subsystems will be addressed in detail in the following sections.
Data Ingestion
Data ingestion is one of the cornerstones of an IoT telemetry platform. In order to be able to analyze, process, store and view the data, the system has to have an entry point for the data. The process of collecting data can be broken down to the following segments: communication channel and protocol, security, internal handling of the data received, scalability and fault tolerance.
The first question that needs to be addressed for data ingestion is the choice and options of connections to the server and the protocols used to communicate. Since IoT devices are generally more constrained in terms of processing power, power consumption and network connectivity, these parameters have to be taken into account. Since there is no official regulatory or standards body at the time of writing for IoT devices, the choice of communication channel falls upon the IoT device manufacturers and the software developers which want to use those devices. Therefore the communication channels and protocol choice is typically driven by technical constraints and mainstream market usage. Another thing that needs to be considered is that IoT devices do not necessarily communicate only with a centralized server but communicate between themselves as well in what is known as machine to machine or M2M communication. With these constraints in mind, we need to evaluate the options that are currently on the market. One of the mainstream options is to reuse existing technical infrastructure that is used online for serving websites and other services as it is well known, has strong support and is the easiest to get started. This means we rely on TCP as a low level protocol for strong data delivery guarantees and HTTP as the communication protocol. However if we further evaluate this we can understand that this method of communication is not well suited for IoT for several technical reasons. While TCP as a low level protocol is well suited for data transportation through different mediums because of its strong guarantees of delivery, HTTP that is layered on top of it as a standard for the IoT devices and edge ingestion servers to understand each other, is not well suited for the task. For every message that needs to be sent a new connection needs to be set up, which involves several back and forth interactions between the device, and once it is set up, data transfer has very high overhead from the HTTP protocol which can in the majority cases be more than the message itself. This leads to surreptitious connection setup and teardowns which are resource intensive and then there is the significant overhead of HTTP which might introduce additional cost for the end users or might limit the total throughput they can achieve which is especially troublesome in very bandwidth constrained devices. The last issue here is that the data is transferred in plain text and is unencrypted which presents privacy and security issues. To encrypt the data we need to use HTTPS which is the secure and content encrypted version of HTTP which significantly increases and complicates the process of setting up a secure connection and additional data overhead. Therefore we need to move to more specialized solutions.
The two most popular alternatives are MQTT and CoAP. MQTT is currently the most popular protocol in use and has proven to be successful in many applications and is shown to work on large scales, one example being Facebook’s Messenger application which powers millions of concurrent users and billions of messages sent every day [14]. MQTT is a protocol also based on TCP but is very lightweight and has a very easy connection setup. MQTT has a very low data overhead of only 2 bytes which lends itself to very network constrained environments, it implements its own delivery guarantees which allows the device to decide at run time what delivery guarantees it needs and sets that by specifying a quality of service level (QoS). By specifying QoS 0 the protocol behaves more like UDP than TCP as it is fire and forget, where the device only sends the message which might or might not be delivered. QoS 1 guarantees at least once delivery which is sufficient for most use cases as it is lightweight enough to be used for its intended purpose yet offers strong enough guarantees that there is no data loss. Finally there is QoS 2 which represents once and exactly once delivery guarantees which is very costly performance wise and is generally not used as the duplicated data that might happen with QoS 1 can be filtered out at later stages of the telemetry platform without too much impact. It also offers twofold security measures, one being using SSL/TLS similar to HTTP, but a lot less often thus reducing the overhead and also makes encryption possible on the content as the protocol is payload agnostic.
CoAP is the second most popular alternative that offers similar mechanics as MQTT and manages resources very well, however it is based on UDP and all guarantees are implemented on the software side thus presenting a larger performance overhead on the device for network handling and also occupies more memory due to its larger footprint leaving less room for other application code. It also only offers security through payload encryption only which can be prone to weaknesses that are not as well tested using its own implementation as are industry standards like SSL and TLS.
Once we settle on a protocol used for transmitting the messages and the necessary security options we want to employ, we need to consider how the data is handled internally once it arrives at the server and how scalability and fault tolerance can be implemented. Building scalable applications means that the application can be deployed across multiple servers and function as a single unit offering its services to a large number of end users and devices. For applications to be able to do such scaling they need to be implemented in one of the two following ways. The first option is if the individual servers do not need to communicate between each other and can be independently deployed expanding the capacity without too much trouble as they are self contained. An example might be a web server serving static content, where the content is replicated across all the servers and can be served to any client from any given machine regardless of state. However this is often not completely possible and thus the services need to communicate between each other as they share states, like user sessions and computations done on a particular unit which needs to be available to other units as well. This introduces further complexity and some internal overhead for the communication which needs to be carefully considered. Once we bring fault tolerance into the equation, we need to understand that once a server unit receives a message from a client before acknowledging it received the message it needs to persist the message so that even if the node does fail before it is pushed further down the pipeline it will not be lost. In the case it fails in the meanwhile it will either not be saved and not acknowledged which will trigger a resend on an at least one delivery policy, or it will be persisted but not acknowledged which still follows the at least once delivery principle. Thus providing fault tolerance. To provide the necessary mechanism for sharing the state across different units deployed, there are two options available. One is having a so called master slave configuration where there are several master units which directly share states between each other and then there are replication units which represent direct or parity copies of the master servers to provide redundancy in case of failure and the possibility to either elect or recreate a new master node. The alternative is to provide a shared memory space that can be used from all the nodes in the pool which transfers all the fault tolerance responsibility on the storage service. For edge servers that accept incoming connections and data this is the preferred approach as it allows for higher performance on the servers themself and thus higher throughput leading to better cost effectiveness while those storage and memory layers often employ the master slave replication strategies as these types of services usually experience less load as they handle simpler requests and are easier to implement replication strategies than edge servers.
Finally there is the last step of data hand off for processing and permanent storage. This process follows the same at least once delivery policy towards the consumer service which will then deduplicate the data further down the pipeline if it does happen. The messages will be persisted for the entire duration until it is acknowledged that it is consumed. At that point the fault tolerance becomes the responsibility of the consumer and we can be sure that once the client is acknowledged that the data is received that it will be processed.
Processing and Analysis
Processing and analysis is an integral part of collecting data. There are two stages of data processing. Once when it arrives and later on demand. The on demand portion will be discussed in a later section as it differs significantly in terms of architecture and approach from data processing at arrival. There are several topics to address when talking about data processing in an IoT telemetry platform. Before diving into details of architecture, one must understand the setting and what kind of data we are dealing with.
As it is evident from the previous section, the system is dealing with a large influx of data constantly coming in, which means that the data is being continually processed and that there is no time in between where we can wait for data to arrive before processing it later. The fact that we are processing data as it is arriving is called on line data processing and it deals with streaming data. Processing streams of data imposes several challenges. When data is coming in continuously we need to consider two core aspects, correctness of results and timeliness. Correctness of results is concerned with the fact that with streaming data it is hard to understand when a given dataset is completed so that it can be processed to provide accurate results. For example if we are streaming event data that has no clear completed signal but is deemed one set of data once there is no data coming in for that particular set longer than some predefined period. In that case, how do we handle late data, which can happen due to a multitude of reasons like network and system performance issues, do we just drop the events, do we aggregate them into a new set or do we revisit the original set and extend its lifetime, even though it was marked as completed in the meanwhile, what is the correct result. To address this we can implement either one of the approaches, however since we do not want to lose data, we need to either create a new set or update the existing one. In that case if we create a new set, are our results might be skewed as a significant event might have occurred that got put in the wrong set thus we are loosing accuracy of results. If we want to update the existing set, we need to keep it from completing its processing as once it leaves the pipeline it is hard to update the data set, so how long do we wait before making a cut, which skews the timeliness of results, and what happens when there is data that is still late even after the prolonged waiting period. Since these points need to be addressed, without data loss in the pipeline and we need the results as quickly as possible, we need to sacrifice some accuracy in order to achieve near real time results without dropping old data.
As we are collecting data it is important that the end results are accurate and we need additional mechanisms to impose this correctness. Therefore we need a system that will offer 100 percent accuracy at least a bit later. This is called eventual consistency as the results will become accurate eventually, after some delay to account for these edge cases which is achieved by reprocessing the data at a later point in batches. The only architecture until recently that has been used to handle such cases is known as the lambda architecture where there are two parallel pipelines, one for streaming data with almost accurate results but in near real time and a batch pipeline that processes data in batches and updates the results later to reflect the true state. However, recently there has been a movement to merge these two pipelines into a single one offering both batch and stream processing of data at the same time thus reducing correctness and timeliness issues as well as cost.
Once the technical and architectural considerations are made, the processing itself needs to be understood for the system to be effective. Once data arrives we first need to make sure that we understand what we received and that the data is correct and possible to be processed in our pipeline. If the data is not correct or not able to be processed with the pipeline, there are several options, but the two main approaches are to either drop the bad data and possibly record the fault somewhere or to separate it out into a different data set which can then be attempted to be recovered or otherwise processed and possibly inform the client or device that there was a fault which can then be handled depending on the requirements. After the initial preprocessing of data, it is required to organize the individual data points into a meaningful collection that can be then processed, after which we can calculate different features that are interesting metrics to us, such as the minimum, maximum, averages, rates or even do anomaly detection, depending on the use cases. Finally the data needs to be persisted which is handled by the next layer. However before completing the pipeline we need to make sure the process is fault tolerant. Since stream data processing can often be done in parallel we can run multiple units of the pipeline in cohesion with no to minimal state sharing. To ensure fault tolerance we employ another acknowledgement process which marks data as reserved when it is pulled into the consumer, but only marks it as completed when it exits the pipeline into the storage layer. Therefore if the data is not marked as completed on time due to a fault, another consumer can pick it up and continue the processing without losing any data in the process.
Data Storage
The final step for a complete telemetry platform is to persist the data. Persisting data while generally a solved problem even for large scale systems with strong replication strategies and data commitment with write and replication acknowledgements, still needs some special considerations.
The challenge here is to store the data such that it can be later reused and queried to provide insights and extract value. Another part of data persistence is creating mechanisms in case of severe failures to allow for recovery as we never want to lose data otherwise also known as backups. For backups we need separate cold storage systems that it highly resilient to large scale failures like datacenters burning down or other catastrophic events. They are called cold as they are not actively used, but only in case of certain events. The data does not necessarily have to be used for backup only but can also be used for testing and optimizing the system out of production or to recalculate results if we introduce new metrics. The fault resilience is introduced both locally and globally. Locally we employ strong replication mechanisms to ensure that single or several server failures do not result in data loss and corruption, and globally we distribute and replicate the data across several regions and datacenter in order to provide resilience for catastrophic events.
The other type of storage, also called hot storage is considerably more complex as it needs deeper understanding of the data itself, unlike cold storage which can just persist the raw messages in any type of aggregate format that is later readable. To be performant we first need to understand that we are dealing with time sensitive data and that all data points are distributed in time. Therefore it is easy to deduce that we need an efficient method to store time series data. Large scale time series databases already exist and have many implementation features that are well suited for the IoT telemetry use case. Time series databases can significantly compress data by storing only the deltas between timestamps and values for any unique time series thus significantly reducing the on disk size but also improving performance when fetching the data by having less data to transfer over the network to the consumer.
Data Exploration
While data storage is the last step of a complete telemetry platform in terms of handling and persisting the data, without insight into the resulting metrics and analysis of the data, it is impossible to extract any value from the captured data. Data exploration on itself is a simple affair as it is well studied and for most use cases can be done with well known data presentation methods such as time series charts, maps or even raw data tables. The data can be further enriched by providing additional aids such as additional derived data or metadata within the context of the current presentation to the end user. However the technical challenges for visualizing data, especially at high volumes and in near real time, are a complex matter that depends on integration at various steps in the process of a telemetry platform. The first challenge is dealing with a large number of simultaneous users being served. This issue can be addressed by horizontally scaling the presentation servers by load balancing by the user session as the presentation servers are stateless and independent from each other. However a deeper scaling issue is the process of retrieving the data from the storage layer for which we need to introduce an additional layer to handle the process. In this layer we address the issue of querying optimizations which are tightly coupled to the data organization in the storage layer. A common issue with large scale systems is that they only offer full table scans and single row lookups which means for a query we either need to provide a row key for an effective lookup or do a complete table lookup to find the relevant data. As we do not know the row keys beforehand we will still need to make a maybe slightly optimized query to get the row keys before we do a final fetch of the data. This can be addressed by a two step process. First we tackle the issue of large tables and length scans by date and time partitioning tables so that for a given time series, if we know the required date time ranges we can query only a minimal subset of the table partition to find the required results. Also by spreading tables out in partitions between storage nodes, we avoid a so called hot node issue where due to some data organization mechanisms we might end up with a disproportionate amount of frequently used data on a single node and therefore end up with a bottleneck as a single node is serving a large portion of the requests. Obviously this step is mostly implemented on the storage layer before being coupled to the query layer. Second we employ an external row key index mechanism also known as reverse search index which we use for pre fetching the row keys in advance. This index works on the basis of storing row keys with matching searchable indexed metadata about the row whenever a new entry is received, so when a query is received we can look up the exact row keys necessary without querying the database thus significantly reducing the load on the storage layer and allowing it to perform more important operations like retrieving or aggregating telemetry data. Once the query layer is in place we can efficiently fetch data in a well structured form from the storage layer which can then be used by the end user and for visualizations. Another key feature that is required to be a platform rather than just a service is the ability to externalize the data to be used for different calculations, outside the scope of an IoT telemetry platform. Such use cases might consist of training machine learning models or performing more advanced calculations or anomaly detection. But as a whole they rely on the fact of the data being available for such use thus it is important to extend the query layer to feature exporting data to external services.
Current State
Before starting designing and implementing the IoT telemetry platform, it is important to evaluate existing options both to see how they perform and what their architectural solutions are and where we can find room for further improvement. As IoT is relatively new to the market, not many solutions exit at the moment. At the time of writing, the large cloud providers such as Google with its Cloud Platform, Amazon with AWS and Microsoft with Azure, are only providing partial solutions to the problem and has yet to be implemented fully. However some more complete solutions exist in the form of both commercial and open source projects. The commercial solutions do not offer direct insight in its operations and architecture, but looking at the documentation and limitations of the system we can evaluate what some of the issues could be. On the other hand the open source solutions offer direct insight into its operations and architecture and are sometimes well documented and evaluated.
One of the leading commercial solutions currently is offered by Losant which advertises to be a complete IoT platform, with features that extend beyond telemetry. However data collection and presentation is still their core and the main service they are offering. For evaluating the platform we are using their documentation as the main resource as it allows insight into many technical details of the platform. A quick overview of the options available shows that the support both MQTT and HTTPS as a protocol of choice with TLS encryption on top of both as a data privacy measure. Another thing that we can see from their documentation is that they use the Google Cloud Platform as the infrastructure provider and use its persistent disk offerings as an assurance for secure data storage both in terms of privacy as well as fault tolerance as Google offers high data integrity guarantees. So far Losant is following the market standard in terms of infrastructure and features. In another section it also shows it usage limits and quotas which give us more insights about what their bottlenecks might be. They impose a 30 message per 15 seconds sliding window rate limit per device, which means on average every device can only send 2 messages per second at most. While this is sufficient for most generic purposes, high frequency applications such as industrial operations tracking have significantly higher throughput requirements. Another limitation imposed is the message size of 64 kilobytes which is fairly generous for the application at hand. However the message format is a bit restrictive in terms of general purpose application as each device has to have a preset format defined before its messages can be accepted which is a limiting factor when deploying new and potentially different devices, especially if their configurations can have external factors which might alter the messaging requirements on the fly [15].
An alternative to the commercial platform, there is the open source platform called Thingsboard which is also a full IoT platform. Since this platform is open source and openly documented, we can investigate into more detail how the architecture is organized and how it functions. Starting off with the initial choices, Thingsboard supports all the major protocols for communication like MQTT, CoAP and HTTP. It also supports TLS as a security layer for secure data connections. Since it is an open platform the architecture is also addressed. Starting from the more generic concepts of being supported on almost all major platforms through different technological solutions, it is possible to deploy this on any mainstream platform, including personal computers for small scale deployments, but also on the cloud for a more performant system. The platform is horizontally scalable and can be deployed in clusters to expand its capacity. However a noticeable architectural drawback is that the IoT platform only handles data collection, querying and visualization and no processing or direct storage capabilities. It is built as a monolith application that runs a complete copy of the server on every unit deployed. The thesis will address the drawbacks of such design choices in a later chapter in more detail. The storage layer has to be provided by the end user of the platform and the end user has to deploy the system on his own and manage fault tolerance and performance optimizations. Also advanced data processing is not possible unless you manually extend the system. While clustering capabilities do exist, the process of creating, connecting and coordinating these clusters depend also on the manager of the platform and needs a separate system to manage it. Since this is not a commercial solution, the system does not directly restrict data throughput but is rather limited by the hardware capabilities. In their own performance tests a single server can accept about 10 thousand messages per second which by the current standards is not very efficient as simple MQTT brokers can accept messages easily at a rate of above 15 thousand per second. This maximum throughput is based on the largest available instance from leading cloud providers at the time of testing and is based on the assumption that the storage layer is not the bottleneck. This indicates that there are internal inefficiencies which need to be further addressed [16].
System Design
System design is an entire field of study, especially in software design where it is heavily relied upon to implement complex systems. The general idea of system design revolves around careful evaluation of problems and options to implement the system in such a way that it achieves the desired results or if not completely possible comes as close as possible to an optimal solution. However for an efficient system design it is not only important to evaluate the solutions to problems at hand but also designing the process of implementing and maintaining such solutions. For an IoT telemetry platform the general process is no different, therefore the following sections will dissect the system design evaluation of each individual component and will also address the technical implementation details.
Before addressing any components of the system, it is important to understand that for a successful system design one must considering the two key principles. Efficiency in terms of what is consumed compared to what is produced, most often this factor revolves around the cost effectiveness of the system and its implementation in terms of time, money and other resources that might be required. The other is effectiveness or to what extent the solution covers the stated problem. In this case it represents the performance, scalability, fault tolerance and timeliness of the platform. Since the system is fairly complex and has several major sub components, each one will be addressed in a separate section in detail.
General Architecture
In this section the overall architecture of the system is discussed. Before diving into individual parts, a global view of the system must exist. The general architecture of a system often depends on the people and the time of the system design and as such is subject of choice of the author. As technology advances, design patterns often change, and as this thesis is working on a new solution for a new and emerging problem, the overall system will be designed as a modular architecture with clear separation of concerns of individual functioning layers and the implementation will be done using a concept known as microservices where small functional units are separately developed, managed and deployed which with some coordination and configuration work as a complete system. The rationale behind such an architecture is that because the system is very large it needs to be implemented using a divide and conquer approach. This allows us to separate concerns of different parts of the system and only expose the minimum amount of implementation details of each so that the system can work. This allows us to easily exchange layers as possibly better solutions become available, to mix several different technologies without complicated bridges and integrations. Another design choice of this system is to leverage services from cloud providers to offer a cost effective platform which can also easily be managed. As the author has previous experience with the Google Cloud Platform, it will be the platform of choice for development and deployment. It is notable that the other cloud providers such as Amazon and Microsoft offer very similar products and solutions and can be with some work be substituted as the underlying platform. However the entire implementation will pay special attention to the portability of the system and will only rely on the external services when it presents a more cost effective solution due to economy of scale [17].
Data Ingestion
As discussed in previous sections, there are multiple parts of the data ingestion process for an IoT telemetry platform. The first step in designing and implementing the data ingestion process is evaluating the communication protocols and security mechanisms both in terms of privacy and delivery guarantees. The industry leading options are HTTP(S), MQTT and CoAP.
HTTP(S) is a well known and widely used protocol for communication on the internet. However its main application is serving large content from servers to consumers and is generally a one directional communication channel where the consumer sends a request for content and the server then responds with its results. A popular option for using the HTTP protocol for machine to machine communication is to use the REST architecture where the main method of communication is using JSON as the content encapsulation mechanism and using the HTTP headers to communicate the desired options for the specific interaction between client and server. Other architectures of communication on top of HTTP exist, however most of them are no longer used and are being phased out thus will not be covered. In terms of ease of use and implementation efficiency HTTP is one of the easiest to use, especially for the simple addition of a security layer using SSL or TLS, with strong delivery guarantees from the protocol itself as every interaction is acknowledged. However, as a protocol designed for easy consumption both by machines but also by humans and with the purpose of serving larger amounts of content in a single request, it has significant drawbacks for smaller and more machine optimized interactions. JSON is a very uncondensed form of sending data and HTTP has significant header overhead, especially after introducing TLS with one of its major drawbacks being the constant creating and tearing down of connections. The performance hit from the additional overhead is twofold. First the IoT clients are generally power and network restricted and need to optimize heavy interactions, especially if it is a high frequency application. Another performance and cost consideration is on the server side. Since HTTP is a heavy protocol a single server unit can support only a limited number of connections in any given time frame, and while horizontal scaling will be implemented, it is not a very cost effective solution as it will require more servers for the same data ingestion capacity as more specialized protocols. The last problem with HTTP is that it is a protocol designed for the server client architecture and is not well suited to swarm communication and between independent nodes of a system.
The second option and the most mainstream solution for telemetry data transport is MQTT. This is a very light weight protocol designed specifically for efficient measurement data transfer and as a message protocol between different devices. The key benefits of such a protocol with purpose design is that it features a very low overhead, it is payload agnostic, is very simple and thus performance effective and offers a bidirectional communication channel with the most common implementation being a pub/sub architecture serving more as a messaging platform than a content server. As a specialized protocol it is better than HTTP in almost every aspect for the task at hand. It also provides configurable delivery guarantees based on the QoS level selected and is more suited for real time applications as it is based on a push mechanism instead of pull and can further capitalize on persistent connections as the protocol is stateless and can reuse existing channels, even with TLS as a security layer.
Finally there is CoAP which represents a more hybrid approach but with goals more aligned with MQTT than with HTTP. It is also purpose built for machine to machine communication with efficiency in mind and is also payload agnostic. However it also features some of the simplicity patterns from HTTP and has a similar communication architecture as REST. However it only offers payload encryption and is not deemed as secure as the other protocols employing industry standard security measures. Lastly it is a very new protocol and has yet to prove effective and be commercially accepted.
After evaluating the initial options MQTT seems as the protocol of choice since it is purpose built for the task at hand and is industry standard. However one should not neglect HTTP and CoAP as they can be solid alternatives which might be interesting to some users [18].
The ingestion server is the entry point of all data and is the first concrete implementation detail in regards to the telemetry platform. Since we decided on the MQTT protocol as our base, that already carries with it that we need to implement an MQTT server also known as a broker. The responsibility of the broker is to relay the messages from publishers to subscribers, handle incoming connection requests and messages as well as keeping track of individual clients. The implementation of the broker has two requirements, to be horizontally scalable and highly efficient, yet well integrated into our solution. After evaluating the current leading solutions such as HiveMQ as a leading commercial solution, we encounter the first barrier of not being extensible enough. If we turn to open source solutions that could be potentially extended we encounter either solutions that are not ready for large scale usage and are not easily horizontally scalable and not performant or are far too complex for their purpose and are hard to integrate in a larger system such as RabbitMQ. With that in mind our final option is to implement our own lightweight solution that we can than fit to our needs. As MQTT is a very simple protocol this is not too hard to achieve and is thus a very cost effective way to solve the problem without introducing too much technical debt. The challenge here is providing horizontal scalability. Since we need to provide a shared state across the clusters, we need to implement a shared high performance memory bus which can be used to share state between nodes and to provide fault tolerance on the memory level. The currently most widely adopted approach for this is to use a Redis cluster as a key value store that acts as a backend memory system. Redis is a very powerful in memory key value database that offers many capabilities in terms of scaling and data redundancy. With this approach it will be possible to create clusters of ingestion nodes that communicate via the Redis cluster and even if a ingestion node fails, the state is stored in a global memory system which will persist the data [19]. This approach allows us to independently scale the memory system based on memory requirements from the ingestion system which is generally CPU bound. This allows us to cost effectively tackle both problems of ingestion. However with such a solution there is a need for a management system that can appropriately scale on its own without too much manual intervention. As we are using a microservices approach the tools for deploying and managing such clusters are Docker as a container engine which allows us to create individual container packages for the ingestion and redis nodes and use Kubernetes as a container orchestration tool to manage the cluster. Google Cloud Platform offers native support for both these approaches thus further increasing the usability of these tools. Kubernetes allows for specifying the hardware requirements as well as scaling rules for automatic resource provisioning using pre set configurations. This reduces the overhead of operation management and allows more resources to be devoted to implementing the platform itself [20].
With the architecture for the ingestion system laid out and horizontal scaling as well as fault tolerance being taken care of, a final part of the ingestion system needs to be dealt with. After receiving data we need a reliable system for organizing the messages so that the consumers from the processing and analysis layers can read data and continue the pipeline. As data integrity is vital in such a system it is important that we offer high durability for messages which are yet to be processed, and while the Redis cluster is great for sharing state and persisting messages for local use, it is not well suited for organizing workload distribution and management, at least not in the current configuration. For a persistent and durable work queue we need to implement an additional mechanism which will handle the interactions between the ingestion and processing layer. At the time of writing the best suited solution is using Kafka, which is in general a pub/sub system just as MQTT and Redis, but focused on data persistence on disk with strong work commit logs aimed at high volume throughput. It is ideal for our use case as it is natively meant for the application at hand with clustering support and strong data guarantees as well as high throughput and work distribution. However Kafka as a system requires significant management and is more complex to deploy than a single application and as such carries significant development and implementation overhead. The alternative is to use a hosted service from Google named Cloud Pub/Sub. This product from our cloud provider of choice is a direct implementation of a system with the same principles, but it is a hosted solution which strips away the management burden of implementing it on our own. However its main selling point is that the cost of such a system significantly drops on large scale, thus using the hosted solution is also much more cost effective than hosting our own [21]. While we will use Cloud Pub/Sub in our implementation as a better design choice, it is important to know that at any moment we can substitute the system with another and are therefore not vendor locked. This is also possible due to the layered microservice architecture as it allows us to be flexible in terms of internal implementation details of a single part of the system. The integration will be done by forwarding all messages received on the ingestion nodes into Cloud Pub/Sub before acknowledging them thus making sure the data is persisted for processing. Another reason for employing a secondary mechanism for the ingestion process is to allow for messages to reside somewhere for a longer period of time and by not being memory bound it is possible for the system to survive very large spikes of traffic before the worker nodes catch up and scale up, or in case of a worker failure, the messages will be persisted for long enough to employ recovery mechanisms and continue processing.