1c + kafka.apache

Публикация № 990211

Программирование - Практика программирования

архитектура python kafka микросервисы backend TCP HTTP

93
Пример построения микросервисов с использованием apache kafka. Данная статья будет полезна интеграторам, программистам. Версия и релиз технологической платформы не имеет значения.

Пример построения микросервисов с использованием apache kafka.

Задача:

Построить событийную архитектуру между приложениями(микросервисы)(не только 1с).

Общая логика решения задачи():

1. Приложение А генерирует событие                                                         

2. Другое приложение Б(В, Г..) "слушает"  кафка (подписывается на событие)(если коротко)

3. Приложение А отправляет данные кафка (Поставщик)

4. кафка отправляет данные всем кто подписан на это событие (Потребитель)

5. Приложение Б(В, Г..)  получает данные

Технологии:

1. 1с (поставщик,  потребитель, tcp клиент)

2.  Разнородный бэкенд (python,  и тд) (поставщик,  потребитель)

3. с++ 1с вк native (транспорт для tcp клиента 1с)

4. apache kafka (шина данных) https://kafka.apache.org

5. flask (python micro web сервер) http://flask.pocoo.org

6. tcp сервер (python)

7. KafkaConsumer, KafkaProducer  (python) https://github.com/dpkp/kafka-python

Общая принцип работы(модули):

1с(1):

При запуске 1с стартует tcp клиент (вк native)(3) .Устанавливает соединение с tcp сервером(6) (передавая ид пользователя из 1с). Подписывается на событие вк native. При наступления события уходит в контекст 1с через ОбработкаВнешнегоСобытия для последующей обработки. Для генерации событий выполняет гет, пост запросы на  веб сервер(5).

tcp сервер(6):

Хеш таблица открытых сокетов(активных соединений) и ид пользователя. Подписывается на события шины данных для отправки клиенту 1с. При наступлении событий отправляет данные в сокет(клиенту 1с). Генерирует события при авторизации(установка сокета), выхода(разрыв, закрытие сокета) в шину данных.

web сервер(5):

Обрабатывает запросы со стороны 1с(прокси для кафка) и отправляет в шину данных. Также подписывается на события сторонних приложений требующих отправку данных для 1с(изменился статус звонка asterisk, прилетел тригер для обновления формы, перегнать что то с &НаСервере на &НаКлиенте , и тд).

Шина данных(4):

Управляет оркестром приложений(упорядоченная очередь). При наступлении события маршутизирует подписчикам, если подсписчиков нет, хранит у себя, доставит потом когда появятся.

Разнородный бэкенд(2):

Подписывается, генерирует события в шину данных.(например web app time line(журнал записи), asterisk ami client(автообзвон, фонер), из предыдущих статей)

Шаг 1. tcp сервер 

 
 tcp сервер

установка сокета(прослушка):
while True:

    try:
        buf = request.recv(256).decode('utf-8')
    except:
        break

отправка в сокет:

sock.sendall(message.value)

генерация события для шины данных: 

producer.send(f'user_auth', key=key, value=value) 

подписка,обработка события:

consumer = KafkaConsumer('ones_socket_send_user', 'ones_socket_send_user_send_all', 'ones_socket_add_listener', 
                                 group_id='my-group', bootstrap_servers=['192.168.777.555:9092']) 
for message in consumer:
    if message.topic=="ones_socket_send_user":

Шаг 2. web сервер

 
 start
 
 route
 
 app

регистрация енд поинт(точка входа, путь и тд):

@app.route('/api_user_event',  methods=['GET', 'POST'])
def api_user_event(): ...

Шаг 3. Пример генерации из стороннего приложения

 
 time line triger update
 
 asterisk ami event triger
def event_listener(event, **kwargs):
    if event.name in EVENTS_NOT_LISTEN:
        return
    
    #print(f"""{str(event)}""")
    
    if event.name == 'Newstate' or event.name == 'Hangup' or event.name == 'VarSet':
        if event.name == 'VarSet':
            if not event.keys['Variable'] in EVENTS_VARS_LISTEN:
                return

        for x in USER_EVENTS:
            if USER_EVENTS[x]['channel'] in event.keys['Channel']:
                d = {}
                
                d['Действия'] = 'Asterisk_Event' 
                d['Данные'] = event.keys 
                d['Данные']['name'] = event.name 
                d['Данные']['id_ext'] = USER_EVENTS[x]['id_ext'] 
                d['Данные']['event_date'] = datetime.now()
                
                jdata = json.dumps(d, default=json_serial)
                
                do_user_event(jdata, USER_EVENTS[x]['user'])

                print(f"""{str(event)}""")

Прослушка ами:

client = AMIClient(address=AMI_ADDRESS, port=AMI_PORT)
future = client.login(username=AMI_USER, secret=AMI_SECRET)

client.add_event_listener(event_listener)

 

Шаг 4. 1с

 
пример кода для работы

Шаг 5. вк native

Полное создание вк native в статье не рассматривается.

 
(часть tcp клиента)

Шаг 6. 1c web services

Используется для передачи сообщения из бизнес приложения для отправки в 1с (серверная часть)(для дальнейшей обработка бизнес логики в контексте 1с) (создание документа, старт бизнес процесса, заполнение справочника) (конечно можно сделать сделать прямой вызов без использования сообщений, здесь идея в том что на один ответ 1c ws может быть в дальнейшем подписано несколько подписчиков, тем самым все получат сообщение и будут выполнят свою логику далее)      

 
1с ws consumer

Подключение к ws:

client = Client(ws, username = username, password = password, cache = NoCache(), timeout = 60)

Вызов метода в контексте 1с(метод ws сервиса):

eval_str = 'client.service.%s(%s)' % (method, jdata and "%r" % jdata or '')
    
    try:
        res = eval(eval_str)
    except Exception as e:
        print(e)
        return

Конструкция вида: upd_event_lists.append([]), и затем  _thread.start_new_thread(upd_loop, (i,)), сделано для того чтобы не завалить ws 1c. Те может случится так что прилетит много запросов и если делать send сразу происходит отказ в установке соединения (Удаленный компьютер отверг запрос на подключение, еррор вин сок и тд.)(проверено примерно на 1000 сообщения поданных вход модулю который выполнял _thread.start_new_thread(send, (username, password, jdata, method, ws,)) для каждого принятого сообщения, сгенерировав их в кафку предварительно с другого модуля, произошел отказ(пользователей было 400 на момент приема сообщений) ну и 1с чета затупила сразу, в плане установки новых соединений с рпхостом(рагент сдох короче)).

 
 1с http consumer

 

import json
from kafka import KafkaProducer, KafkaConsumer
#import _thread
import time
import requests
from requests.auth import HTTPBasicAuth


headers = {'Content-type': 'application/json'}
           #'Accept': 'text/plain'}


ONES_USER = ''
ONES_PASSWORD = ''
ONES_HOST = '192.168.555.560'
ONES_HTTP = 'http://192.168.555.560/AA'

consumer = KafkaConsumer('ones_http_event', group_id='my-group', bootstrap_servers=['192.168.5.131:9092'])

producer = KafkaProducer(bootstrap_servers=['192.168.5.131:9092'])

for message in consumer:
    print(f"""ones_http_event->{message}""")

    method = message.key.decode('utf-8')
    
    data = message.value.decode('utf-8')

    jdata = data
    
    username = ONES_USER
    password = ONES_PASSWORD

    data = json.loads(data)

    id_request = None

    if not data.get('id_request') is None:
        id_request = data.get('id_request')
    
    if not data.get('event_setting') is None:
        event_setting = data.get('event_setting')
        
        username = event_setting.get('username')
        password = event_setting.get('password')

        jdata = event_setting.get('jdata')
        
        if not jdata.get('id_request') is None:
            id_request = jdata.get('id_request')

    auth_ones = HTTPBasicAuth(username, password)
    
    r = None

    try:
        r = requests.get(f"""{ONES_HTTP}/hs/gate/v1?method={method}&data={jdata}""", headers=headers, auth=auth_ones)
            #data=res_str, 
    except Exception as e:
        print(e)
    
    res = ''
    
    if not r is None:
        if r.status_code != 200:
            res = r.reason
            print(r.reason)
        else:
            res = r.text

    if not id_request is None:
        key = id_request.encode('utf-8')
        value = res.encode('utf-8')
        
        producer.send('ones_http_event_response', key=key, value=value)

 

в 1с это http сервис. Это работает быстрее ws, потому что ws это soap over http. Чтобы понять нужно cмотреть в структуру вызовов клиент-сервер SOAP.(получение wsdl, вызов метода soap и тд). http сервис - это голый http - get/post/put/head

Примеры работы:

1c посылает гет во внешнее приложение->внешнее приложение получает событие, обрабытвает генерирует событие на обновление клиента 1с отправляет в кафка->кафка марштузирует tcp серверу->tcp сервер отпраляет в сокет всем клиентам которые зарегистрированы по данному ид->1с обрабатывает внешние событие.

1c посылает гет, пост на веб сервер->веб сервер отправляет в кафка->кафка марштузирует tcp серверу->tcp сервер отпраляет в сокет всем клиентам которые зарегистрированы по данному ид->1с обрабатывает внешние событие.

Пример отправки и получения 500 запросов + в процессе отправки генерация из внешних приложений.

Пример подключения/отключения в процессе отправки.

 

 

Деплой и идеология apache kafka в статье не рассматривается(написано тысячи статей).

Для мониторинга apache kafka используется https://github.com/yahoo/kafka-manager.

 

Метрика:

upd для тех кто мало что понял мануал для dot net от ms, смотреть в общие принципы и идеи

 

93

Скачать файлы

Наименование Файл Версия Размер
вк native
. 135,50Kb
28.01.19
4
. 135,50Kb 4 Скачать

См. также

Специальные предложения

Комментарии
Избранное Подписка Сортировка: Древо
1. TreeDogNight 15 28.01.19 09:29 Сейчас в теме
С помощью какой программы снимали GIF ?
vrednyi_glavred; UniversaLL; +2 Ответить
6. KroVladS 23 28.01.19 15:10 Сейчас в теме
(1)
С помощью какой программы снимали GIF ?

Судя по ватермаркам автор пользовался Icecream Screen Recorder
мне больше нравится LICEcap
dour-dead; +1 Ответить
11. dmarenin 217 28.01.19 16:32 Сейчас в теме
(1) Icecream Screen Recorder дальше в гугле convert web m to gif
2. badboychik 60 28.01.19 10:38 Сейчас в теме
ну хоть внешнюю компоненту-то выложите ((
KroVladS; AntonSm; UniversaLL; rintik; +4 Ответить
31. dmarenin 217 28.01.19 17:05 Сейчас в теме
3. KroVladS 23 28.01.19 12:17 Сейчас в теме
Статья для того чтобы похвастаться?
Примеры чтобы самим потестить где?
12. dmarenin 217 28.01.19 16:32 Сейчас в теме
4. badboychik 60 28.01.19 12:24 Сейчас в теме
Чтобы скомпилить одну ДЛЛ-ку надо поставить 2 гигабайтную студию. ОК.
Запускаю билд примера с ИТС - 211 ошибок при сборке. WTF??
19. dmarenin 217 28.01.19 16:43 Сейчас в теме
(4) ну дак вин сдк нужно для версии длл, иначе как?
20. badboychik 60 28.01.19 16:47 Сейчас в теме
(19) Классно, как раз хотел закачать еще 3 ГБ библиотек и почпокаться с С++ первый раз в жизни
21. dmarenin 217 28.01.19 16:49 Сейчас в теме
(20) больше 3х если для крос
67. starik-2005 1794 25.03.19 18:20 Сейчас в теме
(4) бедняги подвиндозные )))
5. badboychik 60 28.01.19 14:00 Сейчас в теме
kafka-manager как поставить? там в описании указано что надо ставить Play Framework и компилить бинарник через sbt. Так?
13. dmarenin 217 28.01.19 16:33 Сейчас в теме
(5) нет есть путь короче имя ему докер хаб
22. badboychik 60 28.01.19 16:52 Сейчас в теме
(13) 6 тысяч результатов по "kafka", в первых нескольких нет инфы, что в них есть zkeeper, kafka-manager. Опять нет конкретики
30. badboychik 60 28.01.19 17:04 Сейчас в теме
(23) Было бы круто сделать свой докер со всеми упомянутыми средствами - zkeeper, kafka, kafka-man, flask, и одной командой разворачивать всю схему
32. dmarenin 217 28.01.19 17:08 Сейчас в теме
(30) zkeeper, kafka, kafka-man, flask, .... windows, 1c, excel, word, doom...

а если по делу суть контейнеров как раз так и заключается в разделении приложения

те.

вот есть кафка
есть кафка менеджер
есть бизнес поделки
вот они как бы вместе взаимодействуют.
оркестр

это мое мнение, и вот так вот мы делаем.
flask вообще пакет питона, ставится одной строкой в докер бизнес приложения(если в имидже его не было).
starik-2005; +1 Ответить
7. minimajack 53 28.01.19 15:30 Сейчас в теме
Какова производительность отправки?
18. dmarenin 217 28.01.19 16:40 Сейчас в теме
(7) в сравнении с чем? на гифках видно думаю при отправке 500 постов. с учетом того что штатными средствами (без использования бд) межсеансовая клиентская передача не возможна, считаю ваш вопрос не корректным. но суть не сколько во межсеансовой передаче, сколько во внешнем воздействии (события)
24. minimajack 53 28.01.19 16:54 Сейчас в теме
(18) сколько пакетов в секунду размером в 1 байт отправляется
25. dmarenin 217 28.01.19 16:56 Сейчас в теме
(24) сколько пакетов отправляется откуда?
26. minimajack 53 28.01.19 16:56 Сейчас в теме
33. dmarenin 217 28.01.19 17:19 Сейчас в теме
(26) добавил гифку по скорости передачи, там где с браузера передача идет (последняя гифка). это под рабочей нагрузкой.
34. minimajack 53 28.01.19 18:25 Сейчас в теме
(33) господи....цифру напишите - потолок.
Например отправляется 200 сообщений в секунду. Пиковая нагрузка полторы тыщи.
37. dmarenin 217 28.01.19 18:40 Сейчас в теме
28. dmarenin 217 28.01.19 16:59 Сейчас в теме
(24) там все над сишные, крестовые поделки(питон(консумер, продакшен кафка), 1с аддин), отсюда скорость.(ну да ртос далеко, ну так реалтайм до 1с, дальше конечно просадка). более 100 сокетов.сразу. к чему такой вопрос?
35. minimajack 53 28.01.19 18:27 Сейчас в теме
(28) как можно по гифке можно оценить скорость? Вот 100 сокетов - это уже полезно...значит на 100 пользователях возможно не упадет =)
Хочу оценить порядок скорости со своим велосипедом.
36. dmarenin 217 28.01.19 18:38 Сейчас в теме
(35) должно упасть? если и упадет кафка доставит чуть позже. см идеологию
38. minimajack 53 28.01.19 18:40 Сейчас в теме
(36) хорошо...
У меня стоит двадцать датчиков которые генерируют 100 событий в секунду. Смогу ли я отправлять в кафку 2000 сообщений из клиента 1С? А если да - сколько я могу поставить еще датчиков что бы уперется в предел отправки?
40. dmarenin 217 28.01.19 18:49 Сейчас в теме
(38) не упрется в том то и дело! что упорядоченная очередь. те как там в 1с прийдет дело 3 тье, главное их в стэк очередь. и если честно генерацию датчиков 2000 в сек наверно лучше не делать из 1с. в моем примере отправка 500 постов в первом случае, 5000 в другом на скринах, не упало.
starik-2005; +1 Ответить
39. dmarenin 217 28.01.19 18:41 Сейчас в теме
(36) по гифке? визуально явно там не байты ходят. вам метрика нужна? я могу замерить, потом окажется у вас железо не то, деплой не тот, нагрузка не та., и тд...
43. minimajack 53 28.01.19 18:58 Сейчас в теме
(39)
явно там не байты ходят. вам метрика нужна? я могу замерить, потом окажется у вас железо не то, деплой не тот, нагрузка не та

да какая разница какое железо...там в 100% упрется в 1С. По гифке 1 сообщение в секунду =)
46. dmarenin 217 28.01.19 19:03 Сейчас в теме
(43) да упрется в 1с так как карусельная виртуальная машина(выполнение потоков по карусели)
47. dmarenin 217 28.01.19 19:05 Сейчас в теме
(46) почему и высокую нагрузку в обход 1с и предлагал делать, а там куда запишет и как не столь важно. наверно имеет смысл после кафки писать в дб сразу. тогда показания будут корректными. на 1с только отображать
starik-2005; +1 Ответить
48. minimajack 53 28.01.19 19:10 Сейчас в теме
Вы нагрузочное тестирование вообще проводили?
51. dmarenin 217 28.01.19 19:18 Сейчас в теме
(48) отправки 20000 в сек сообщений с 2000 клиентов из 1с. конечно нет. а смысл ? в статье нет речи о посторении скады. тесты были для бизнес приложений и взаимодействия 1с. написано выше было более 100 открытых сокетов и пример передачи. какой вопрос? вы что хотите цифры? дак разверните у себя сделайте замер...
52. minimajack 53 28.01.19 19:41 Сейчас в теме
(51) а смысл кафки в вашем контексте? 100 открытых сокетов держит древний селерон с гигом оперативы...а 1 сообщение в секунду можно и в sqllite писать и читать...
Нагнать хайпа ? ))
53. dmarenin 217 28.01.19 19:46 Сейчас в теме
(52) смысл во внешнем воздействии сторонних приложений и обработкой в 1с. обработка ивента например на обновление формы, реакция на смену статуса звонка, реакция межсеансового события 1с клиента. sqllite писать и читать это возможно ваш подход, не мой. прочитайте статью от начала. просьба.
56. minimajack 53 28.01.19 20:06 Сейчас в теме
Насколько я вижу взаимодействие происходит с вебсервером(TCP-сервером)...что и где там дальше по цепочке впринцепе не важно...меняем кафку на реббит или базу данных - и ничего для 1С не меняется.
Вот в чем вопрос...почему кафка?? а ответа нет.
Так варианты:
1С + webserver + кафка
1C + регистр сведений + чтение периодическое
1С + native api + rabbit mq
57. dmarenin 217 28.01.19 20:08 Сейчас в теме
(56) ну давайте не будем про 1C + регистр сведений + чтение периодическое

1С + native api + rabbit mq

что за native сможет в туда и сюда(клиент и сервер)?
58. dmarenin 217 28.01.19 20:10 Сейчас в теме
(57) есть сорсы(крестовые) для натс под 1с(натив вк) стоит ли мерятся? не вижу смысла.
61. minimajack 53 28.01.19 20:31 Сейчас в теме
(58) а толку? Снова же: конфигурация не та...деплой говно, метрику не дам, нагрузочного тестирования не делал =))))
Soloist; DimaP; +2 Ответить
65. dmarenin 217 29.01.19 09:28 Сейчас в теме
(61) "конфигурация не та...деплой говно" - возможно, это будут ваши первые слова когда наши метрики не совпадут и у вас будет ниже, "метрику не дам" - дам,
"нагрузочного тестирования не делал" - вероятно делал
60. minimajack 53 28.01.19 20:28 Сейчас в теме
(57)
Вместо метрики - gif.
Вместо внятного объяснения выбора кафки - хрень с натсом.

Нормально бы написали:
Пользовался тем то - устраивало
Пользовался тем то - стало лучше
Пользовался тем-то - еще лучше и стабильней
Внедрил кафку - ваще огонь...

p.s. Yellow RabbitMQ - вроде может и отправлять и принимать, и на сервере и на клиенте. Если что я не от них =)
62. dmarenin 217 29.01.19 08:57 Сейчас в теме
(60) метрика:
Прикрепленные файлы:
63. dmarenin 217 29.01.19 09:15 Сейчас в теме
(60) еще метрика
Прикрепленные файлы:
64. dmarenin 217 29.01.19 09:26 Сейчас в теме
(60) еще метрика:

Хватит метрик? или еще добавить?
Прикрепленные файлы:
66. minimajack 53 29.01.19 11:25 Сейчас в теме
59. dmarenin 217 28.01.19 20:13 Сейчас в теме
(56) "Насколько я вижу взаимодействие происходит с вебсервером(TCP-сервером) " ну да дальше то сокет уже открыт, считай лонг пол для сторонних приложений. бля, до сих пор не понимаю. в чем вопрос то??
8. badboychik 60 28.01.19 15:34 Сейчас в теме
фух, поставил зукипер, кафку, кафкоманагер. Осталось компоненту внешнюю получить!
9. minimajack 53 28.01.19 15:37 Сейчас в теме
(8) не забудьте про питон и скрипты запуска вебсервера
15. dmarenin 217 28.01.19 16:34 Сейчас в теме
16. dmarenin 217 28.01.19 16:35 Сейчас в теме
(8) дак там ничего в ней нет. сишный tcp клиент под интерфейсом addin 1c. ну если сильно нужно то конечно могу приложить
17. dmarenin 217 28.01.19 16:37 Сейчас в теме
(16) код tcp клиента есть в статье. вообще на этом форуме возможно их есть с десяток.
10. comol 3825 28.01.19 15:57 Сейчас в теме
Так я не понял со стороны 1С сообщения получаются на tcp сервер? т.е. на стороне 1С должна быть поднята ВК с tcp сервером, так?
14. dmarenin 217 28.01.19 16:33 Сейчас в теме
(10) нет. не сервер, tcp клиент. в статье написано.
27. comol 3825 28.01.19 16:57 Сейчас в теме
(14)
Меня смутили:
1с обрабатывает внешние событие
внешнее событие генерит компонента?

tcp сервер отпраляет в сокет всем клиентам
клиенты же отправляют...
29. dmarenin 217 28.01.19 17:01 Сейчас в теме
(27) генерит компонента? да. клиенты же отправляют не всегда, в чем суть то и есть. поставщиком может быть как 1с так и не 1с. и клиенты чего?? в статье же разделение логики вроде как описывалось. есть часть которая отвечает только за отправку в сокеты для 1с, есть часть которая генерирует эти события, есть часть которая генерирует события которые нужно в сокет отправить. объяснил?
41. badboychik 60 28.01.19 18:55 Сейчас в теме
а потянет кафка перекидывания 1..2-х мегабайтными XML-ями?
44. dmarenin 217 28.01.19 19:00 Сейчас в теме
(41) потянет 100% (тут вопрос не кафке уже а в железе и оси), но лучше не так. создать файл, сделать ивент, получить ивент типа прочитай. (типа выставил указатель), получил ответ снял с регистрации, и тд. это как бы и есть событийная логика. а не гонять данные. на крайняк кафка может в стримы.
42. badboychik 60 28.01.19 18:56 Сейчас в теме
Еще вопрос, как можно в реальном времени смотреть за нагрузкой/очередями в кафке?
45. dmarenin 217 28.01.19 19:02 Сейчас в теме
(42) да на скринах есть(нет передачи потому что нужно включить логи). прям реал тайм график построить кафка манеджер не сможет, можно самому запилить там делов... еще вопросы?
49. badboychik 60 28.01.19 19:11 Сейчас в теме
(45) Как все это запускается? У вас все сервисы в докере? Вроде можно композером объединить все в одну кучу?
И кстати tcp-сервер на питоне надо в виде сервиса завернуть? или как запускать чтоб постоянно работал?
50. dmarenin 217 28.01.19 19:14 Сейчас в теме
(49) в скрипте работает постоянно. ибо main. в статье разные приложения. даже расписано что за чо отвечает. ключевое слово "шаг"

ответ на вопрос " кстати tcp-сервер на питоне надо в виде сервиса завернуть?" screen
54. badboychik 60 28.01.19 19:56 Сейчас в теме
(50) то что постоянно работает я понял, я имею в виду как сделать автостарт и фоновую работу всех служб. В винде можно специальной софтиной из любой программы сделать обычную службу. У вас все эти сервисы на линуксе как демоны или както по другому?
55. dmarenin 217 28.01.19 20:02 Сейчас в теме
(54) "У вас все эти сервисы" (он один контейнер из статьи) "на линуксе" -да, "как демоны" - нет. в винде батник нормально сработает. в дебе в полне себе баш сработает вида:

screen -dmUS queue выпелено
screen -dmUS server python3.6 /OnesSocketServer/OnesSocketServer/ones_socket_server.py
screen -dmUS proxy python3.6 /OnesSocketServer/ProxyOnesSocketServer/runserver.py
screen -dmUS consumer python3.6 /OnesSocketServer/ProxyOnesSocketServer/ones_socket_consumer­.py - это часть из статьи вынесенная отдельно для генерации событий отправки в сокет, можно так не делать, можно делать...
Оставьте свое сообщение