Browse Source

Simple pub/sub system.

Beoran 3 years ago
parent
commit
8dac2c83af
2 changed files with 335 additions and 0 deletions
  1. 1 0
      src/upubsub.c
  2. 334 0
      src/upubsub.h

+ 1 - 0
src/upubsub.c

@@ -0,0 +1 @@
+#include "upubsub.h"

+ 334 - 0
src/upubsub.h

@@ -0,0 +1,334 @@
+/* upubsub: a minimal publish/subscribe 1 header file library for pure ANSI C. */
+#ifndef UPUBSUB_H_INCLUDED
+#define UPUBSUB_H_INCLUDED
+
+#ifndef UPUBSUB_LISTENER_AMOUNT
+#define UPUBSUB_LISTENER_AMOUNT 64
+#endif /* UPUBSUB_LISTENER_AMOUNT */
+
+#ifndef UPUBSUB_MESSAGE_SIZE
+#define UPUBSUB_MESSAGE_SIZE 64
+#endif /* UPUBSUB_MESSAGE_SIZE */
+
+#ifndef UPUBSUB_TOPIC_AMOUNT
+#define UPUBSUB_TOPIC_AMOUNT 32
+#endif /* UPUBSUB_TOPIC_AMOUNT */
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <string.h>
+
+
+struct upubsub_message {
+	const char * topic;
+	size_t	  size;
+	uint8_t   copy[UPUBSUB_MESSAGE_SIZE];
+	int	  copy_ok;
+	void    * data;
+};
+
+struct upubsub_listener;
+struct upubsub;
+
+typedef int (upubsub_listen_func)(struct upubsub_listener *l, struct upubsub_message m);
+
+struct upubsub_listener {
+	struct upubsub * upubsub;
+	unsigned int id;
+	const char * topic;
+	void * data;
+	upubsub_listen_func * listen;
+};
+
+struct upubsub_topic {
+	const char * topic;
+	size_t size;
+	struct upubsub_listener * first;
+};
+
+struct upubsub {
+	size_t size;
+	unsigned int last_id;
+	struct upubsub_listener listeners[UPUBSUB_LISTENER_AMOUNT];
+	size_t topics_size;
+	struct upubsub_topic topics[UPUBSUB_TOPIC_AMOUNT];
+};
+
+
+int upubsub_listener_cmp_topic_id(const void * v1, const void * v2);
+int upubsub_listener_cmp_topic(const void * v1, const void * v2);
+int upubsub_listener_cmp_id(const void * v1, const void * v2);
+struct upubsub_listener * upubsub_subscribe_listener(struct upubsub * u, struct upubsub_listener l);
+struct upubsub_listener* upubsub_subscribe(struct upubsub * u, const char * topic, void * data, upubsub_listen_func * listen);
+int upubsub_publish_message(struct upubsub u, struct upubsub_message m);
+int upubsub_publish_data(struct upubsub u, const char * topic, void * data, size_t size);
+int upubsub_publish_str(struct upubsub u, const char * topic, char * str);
+int upubsub_unsubscribe_listener(struct upubsub * u, struct upubsub_listener * l);
+
+#endif /* UPUBSUB_H_INCLUDED */
+
+#ifdef UPUBSUB_TEST
+#define UPUBSUB_IMPLEMENTATION
+#endif
+
+#ifdef UPUBSUB_IMPLEMENTATION
+
+int bsearch_low_index(const void *key, const void *base, size_t nel, size_t width, int (*cmp)(const void *, const void *)) {
+	void *try;
+	int sign;
+    int found;
+    int low = 0;
+    int high = nel;    
+	while (low <= high) {
+        int mid = low + (high - low) / 2;
+        try = (char*)base + (mid*width);
+		sign = cmp(key, try);
+		if (sign <= 0) {
+			high = mid - 1;
+		} else {
+			low = mid + 1;
+		}
+	}
+	return low;
+}
+
+int bsearch_high_index(const void *key, const void *base, size_t nel, size_t width, int (*cmp)(const void *, const void *)) {
+	void *try;
+	int sign;
+    int found;
+    int low = 0;
+    int high = nel;    
+	while (low <= high) {
+        int mid = low + (high - low) / 2;
+        try = (char*)base + (mid*width);
+		sign = cmp(key, try);
+		if (sign < 0) {
+			high = mid - 1;
+		} else {
+			low = mid + 1;
+		}
+	}
+	return high;
+}
+
+
+int upubsub_listener_cmp_topic_id(const void * v1, const void * v2) {
+	int res;
+	struct upubsub_listener *l1, *l2;
+	l1 = (struct upubsub_listener *)(v1);
+	l2 = (struct upubsub_listener *)(v2);
+
+	/* Sort with null topic as last, as these ones are unused. */
+	if ((!l1->topic) && (l2->topic)) { return 1;  }
+	if ((!l2->topic) && (l1->topic)) { return -1; }
+	if ((!l2->topic) && (!l1->topic)) { return 0; }
+     	/* Otherwise compare topics. */
+	res = strcmp(l1->topic, l2->topic);
+	/* If the same sort by id to try to keep the sort stable. */
+	if (!res) {
+		return l1->id - l2->id;
+	}
+	return res;
+}
+
+int upubsub_topic_cmp_topic(const void * v1, const void * v2) {
+	struct upubsub_topic *l1, *l2;
+	l1 = (struct upubsub_topic *)(v1);
+	l2 = (struct upubsub_topic *)(v2);
+
+	/* Sort with null topic as last, as these ones are unused. */
+	if ((!l1->topic) && (l2->topic)) { return 1;  }
+	if ((!l2->topic) && (l1->topic)) { return -1; }
+	if ((!l2->topic) && (!l1->topic)) { return 0; }
+     	/* Otherwise compare topics. */
+	return strcmp(l1->topic, l2->topic);
+}
+
+int upubsub_listener_cmp_topic(const void * v1, const void * v2) {
+	struct upubsub_listener *l1, *l2;
+	l1 = (struct upubsub_listener *)(v1);
+	l2 = (struct upubsub_listener *)(v2);
+
+	/* Sort with null topic as last, as these ones are unused. */
+	if ((!l1->topic) && (l2->topic)) { return 1;  }
+	if ((!l2->topic) && (l1->topic)) { return -1; }
+	if ((!l2->topic) && (!l1->topic)) { return 0; }
+     	/* Otherwise compare topics. */
+	return strcmp(l1->topic, l2->topic);
+}
+
+int upubsub_listener_cmp_id(const void * v1, const void * v2) {
+	struct upubsub_listener *l1, *l2;
+	l1 = (struct upubsub_listener *)(v1);
+	l2 = (struct upubsub_listener *)(v2);
+
+	/* Sort with null topic as last, as these ones are unused. */
+	if ((!l1->topic) && (l2->topic)) { return 1;  }
+	if ((!l2->topic) && (l1->topic)) { return -1; }
+	if ((!l2->topic) && (!l1->topic)) { return 0; }
+	/* Compare by id to find the exact listener. */
+	return l1->id - l2->id;
+}
+
+struct upubsub_listener * upubsub_subscribe_listener(struct upubsub * u, struct upubsub_listener l)  {
+	struct upubsub_listener * res;
+	if ( u->size >= UPUBSUB_LISTENER_AMOUNT ) {
+		return NULL;
+	}
+	u->last_id++;
+	l.id = u->last_id;
+	l.upubsub = u;
+	u->listeners[u->size] = l;
+	res = u->listeners + u->size;
+	u->size ++;
+	qsort(u->listeners, u->size, sizeof(struct upubsub_listener), upubsub_listener_cmp_topic_id);
+	return res;
+}
+
+struct upubsub_listener* upubsub_subscribe(struct upubsub * u, const char * topic, void * data, upubsub_listen_func * listen) {
+	struct upubsub_listener l;
+	l.topic = topic;
+	l.data = data;
+	l.listen = listen;
+	return upubsub_subscribe_listener(u, l);
+}
+
+
+int upubsub_publish_message(struct upubsub u, struct upubsub_message m) {
+	int sent = 0;
+	size_t start, index;
+	struct upubsub_listener * found = NULL;
+	struct upubsub_listener key;
+    int low, high;
+
+	key.topic = m.topic;
+	key.id = 1;    
+	low = bsearch_low_index(&key, u.listeners, u.size, sizeof(struct upubsub_listener), upubsub_listener_cmp_topic);
+    high = bsearch_high_index(&key, u.listeners, u.size, sizeof(struct upubsub_listener), upubsub_listener_cmp_topic);
+	if (high < low) {
+		return sent;
+	}	
+	for (index = low; index <= high; index ++) {
+		found = u.listeners + index;
+		found->listen(found, m);
+		sent ++;
+	}
+	return sent;
+}
+
+int upubsub_publish_data(struct upubsub u, const char * topic, void * data, size_t size) {
+	struct upubsub_message m = { 0 };
+	m.data = data;
+	m.size = size;
+	m.copy_ok = 0;
+	if (m.size < UPUBSUB_MESSAGE_SIZE) {
+		memcpy(m.copy, data, m.size);
+		m.copy_ok = 1;
+	}
+	m.topic = topic;
+	return upubsub_publish_message(u, m);
+}
+
+int upubsub_publish_str(struct upubsub u, const char * topic, char * str) {
+	return upubsub_publish_data(u, topic, str, strlen(str));
+}
+
+
+int upubsub_unsubscribe_listener(struct upubsub * u, struct upubsub_listener * l)  {
+	struct upubsub_listener * found = NULL;
+
+	found = bsearch(l, u->listeners, u->size, sizeof(struct upubsub_listener), upubsub_listener_cmp_id);
+	if (!found) {
+		return -1;
+	}
+	/* Set to unused. */
+	found->id = 0;
+	found->topic = NULL;
+        /* Unused will be sorted to the back. */
+	qsort(u->listeners, u->size, sizeof(struct upubsub_listener), upubsub_listener_cmp_topic_id);
+	u->size--;
+	return 0;
+}
+
+int upubsub_unsubscribe(struct upubsub_listener * l)  {
+	return upubsub_unsubscribe_listener(l->upubsub, l);
+}
+
+#endif /* UPUBSUB_IMPLEMENTATION */
+
+
+
+#ifdef UPUBSUB_TEST
+
+#include <stdio.h>
+
+int listen_print_str(struct upubsub_listener *l, struct upubsub_message m) {
+	printf("received on topic %s: data %s, %s\n", l->topic, (char*)(l->data), (char *)(m.data));
+	return 0;
+}
+
+int compare_int(const void * key, const void * mem) {
+    int * i1 = (int*)key;
+    int * i2 = (int*)mem;
+    return (*i1) - (*i2);
+}
+
+int main(void) {
+	struct upubsub pusu = { 0 };
+	struct upubsub_listener *l[10];
+    int arr[] = { 7, 8 , 8, 9, 9, 9, 10, 11, 12, 20, 20, 32 };
+    int low, high;
+    int key = 9;
+    low = bsearch_low_index(&key, &arr, 12, sizeof(int), compare_int);
+    high = bsearch_high_index(&key, &arr, 12, sizeof(int), compare_int);
+    printf("Low, high: %d -> %d...\n", low, high);
+    if ((low > 0) && (low < 12)) {
+        printf("Low: %d\n", arr[low]);
+    }
+    if ((high > 0) && (high < 12)) {
+        printf("High: %d\n", arr[high]);
+    }
+    
+    for (key = 0; key < 40; key ++) {     
+        low = bsearch_low_index(&key, &arr, 12, sizeof(int), compare_int);
+        high = bsearch_high_index(&key, &arr, 12, sizeof(int), compare_int);
+        printf("Key: %d; Low, high: %d -> %d:", key, low, high);
+        if ((low >= 0) && (low < 12)) {
+            printf(" Low: %d", arr[low]);
+        }
+        if ((high >= 0) && (high < 12)) {
+            printf(" High: %d", arr[high]);
+        }
+        printf("\n");
+    }
+    
+    
+
+	printf("Subscribing...\n");
+	l[0] = upubsub_subscribe(&pusu, "POWER" , "SYST", listen_print_str);
+	l[1] = upubsub_subscribe(&pusu, "POWER" , "APP1", listen_print_str);
+	l[2] = upubsub_subscribe(&pusu, "SYSTEM", "APP2", listen_print_str);
+	l[3] = upubsub_subscribe(&pusu, "SYSTEM", "WIND", listen_print_str);
+
+	printf("Publishing...\n");
+	upubsub_publish_str(pusu, "POWER", "DOWN");
+	upubsub_publish_str(pusu, "POWER", "UP");
+	upubsub_publish_str(pusu, "SYSTEM", "CLICK(7,8)");
+	upubsub_publish_str(pusu, "SYSTEM", "ROLL(9,5)");
+	upubsub_publish_str(pusu, "NULL", "IGNORE");
+
+	printf("Removing topics...\n");
+	upubsub_unsubscribe(l[1]);
+	upubsub_unsubscribe(l[3]);
+
+	printf("Publishing...\n");
+	upubsub_publish_str(pusu, "POWER", "DOWN 2");
+	upubsub_publish_str(pusu, "POWER", "UP 2");
+	upubsub_publish_str(pusu, "SYSTEM", "CLICK(7,8) 2");
+	upubsub_publish_str(pusu, "SYSTEM", "ROLL(9,5) 2");
+	upubsub_publish_str(pusu, "NULL", "IGNORE");
+
+	return 0;
+}
+
+#endif /* UPUBSUB_TEST */